523 lines
18 KiB
Python
Executable File
523 lines
18 KiB
Python
Executable File
# This file is part of Tryton. The COPYRIGHT file at the top level of
|
|
# this repository contains the full copyright notices and license terms.
|
|
|
|
import collections.abc
|
|
import copy
|
|
import sys
|
|
from collections import defaultdict
|
|
from functools import total_ordering
|
|
from itertools import chain
|
|
|
|
from trytond.i18n import lazy_gettext
|
|
from trytond.model import fields
|
|
from trytond.pool import Pool, PoolBase, PoolMeta
|
|
from trytond.pyson import PYSONDecoder, PYSONEncoder
|
|
from trytond.rpc import RPC
|
|
from trytond.transaction import Transaction
|
|
from trytond.url import URLMixin
|
|
|
|
__all__ = ['Model']
|
|
|
|
|
|
class ModelMeta(PoolMeta):
|
|
@property
|
|
def __queue__(self):
|
|
pool = Pool()
|
|
Queue = pool.get('ir.queue')
|
|
return Queue.caller(self)
|
|
|
|
|
|
@total_ordering
|
|
class Model(URLMixin, PoolBase, metaclass=ModelMeta):
|
|
"""
|
|
Define a model in Tryton.
|
|
"""
|
|
__slots__ = (
|
|
'_id', '_values', '_init_values', '_removed', '_deleted',
|
|
'__weakref__')
|
|
_rec_name = 'name'
|
|
|
|
id = fields.Integer(lazy_gettext('ir.msg_ID'), readonly=True)
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super(Model, cls).__setup__()
|
|
cls.__rpc__ = {
|
|
'default_get': RPC(cache=dict(seconds=5 * 60)),
|
|
'fields_get': RPC(cache=dict(days=1)),
|
|
'pre_validate': RPC(instantiate=0),
|
|
}
|
|
cls.__access__ = set()
|
|
|
|
# Copy fields and update depends
|
|
for attr in dir(cls):
|
|
if attr.startswith('_'):
|
|
continue
|
|
if not isinstance(getattr(cls, attr), fields.Field):
|
|
continue
|
|
field_name = attr
|
|
field = getattr(cls, field_name)
|
|
# Copy the original field definition to prevent side-effect with
|
|
# the mutable attributes
|
|
for parent_cls in cls.__mro__:
|
|
parent_field = getattr(parent_cls, field_name, None)
|
|
if isinstance(parent_field, fields.Field):
|
|
field = parent_field
|
|
field = copy.deepcopy(field)
|
|
setattr(cls, field_name, field)
|
|
|
|
@classmethod
|
|
def __post_setup__(cls):
|
|
super(Model, cls).__post_setup__()
|
|
|
|
# Set _fields
|
|
cls._fields = {}
|
|
for attr in dir(cls):
|
|
if attr.startswith('_'):
|
|
continue
|
|
if isinstance(getattr(cls, attr), fields.Field):
|
|
cls._fields[attr] = getattr(cls, attr)
|
|
cls._record = record(cls.__name__ + '._record', cls._fields.keys())
|
|
|
|
# Set _defaults
|
|
cls._defaults = {}
|
|
fields_names = list(cls._fields.keys())
|
|
for field_name in fields_names:
|
|
default_method = getattr(cls, 'default_%s' % field_name, False)
|
|
if callable(default_method):
|
|
cls._defaults[field_name] = default_method
|
|
|
|
for k in cls._defaults:
|
|
assert k in cls._fields, \
|
|
'Default function defined in %s but field %s does not exist!' \
|
|
% (cls.__name__, k,)
|
|
|
|
# Set name to fields
|
|
for name, field in cls._fields.items():
|
|
if field.name is None:
|
|
field.name = name
|
|
else:
|
|
assert field.name == name, (
|
|
'Duplicate fields on %s: %s, %s'
|
|
% (cls, field.name, name))
|
|
|
|
@classmethod
|
|
def _get_name(cls):
|
|
if cls.__doc__ is None:
|
|
print("\n💥 MODELE SANS DOCSTRING :", cls.__name__, " (module:", cls.__module__, ")")
|
|
raise Exception("MODELE SANS DOCSTRING")
|
|
|
|
lines = cls.__doc__.splitlines()
|
|
if lines:
|
|
return lines[0]
|
|
return cls.__name__
|
|
|
|
# @classmethod
|
|
# def _get_name(cls):
|
|
# '''
|
|
# Returns the first non-empty line of the model docstring.
|
|
# '''
|
|
# assert cls.__doc__, '%s has no docstring' % cls
|
|
# lines = cls.__doc__.splitlines()
|
|
# for line in lines:
|
|
# line = line.strip()
|
|
# if line:
|
|
# return line
|
|
|
|
@classmethod
|
|
def __register__(cls, module_name):
|
|
"""
|
|
Add model in ir.model and ir.model.field.
|
|
"""
|
|
super(Model, cls).__register__(module_name)
|
|
pool = Pool()
|
|
Translation = pool.get('ir.translation')
|
|
Model_ = pool.get('ir.model')
|
|
ModelField = pool.get('ir.model.field')
|
|
|
|
model_id = Model_.register(cls, module_name)
|
|
ModelField.register(cls, module_name, model_id)
|
|
|
|
Translation.register_model(cls, module_name)
|
|
Translation.register_fields(cls, module_name)
|
|
|
|
@classmethod
|
|
def default_get(cls, fields_names, with_rec_name=True):
|
|
'''
|
|
Return a dict with the default values for each field in fields_names.
|
|
If with_rec_name is True, rec_name will be added.
|
|
'''
|
|
pool = Pool()
|
|
value = {}
|
|
context = Transaction().context
|
|
|
|
default_rec_name = context.get('default_rec_name')
|
|
if (default_rec_name
|
|
and cls._rec_name in cls._fields
|
|
and cls._rec_name in fields_names):
|
|
value[cls._rec_name] = default_rec_name
|
|
|
|
# get the default values defined in the object
|
|
for field_name in fields_names:
|
|
default_name = f'default_{field_name}'
|
|
if field_name in cls._fields and default_name in context:
|
|
value[field_name] = context.get(default_name)
|
|
elif field_name in cls._defaults:
|
|
value[field_name] = cls._defaults[field_name]()
|
|
field = cls._fields[field_name]
|
|
if (field._type == 'boolean'
|
|
and field_name not in value):
|
|
value[field_name] = False
|
|
if (with_rec_name
|
|
and field._type in ('many2one',)
|
|
and value.get(field_name)):
|
|
Target = pool.get(field.model_name)
|
|
if 'rec_name' in Target._fields:
|
|
value.setdefault(
|
|
field_name + '.', {})['rec_name'] = Target(
|
|
value[field_name]).rec_name
|
|
return value
|
|
|
|
@classmethod
|
|
def fields_get(cls, fields_names=None, level=0):
|
|
"""
|
|
Return the definition of each field on the model.
|
|
"""
|
|
definition = {}
|
|
pool = Pool()
|
|
Translation = pool.get('ir.translation')
|
|
FieldAccess = pool.get('ir.model.field.access')
|
|
ModelAccess = pool.get('ir.model.access')
|
|
|
|
# Add translation to cache
|
|
language = Transaction().language
|
|
trans_args = []
|
|
for fname, field in cls._fields.items():
|
|
if fields_names and fname not in fields_names:
|
|
continue
|
|
trans_args.extend(field.definition_translations(cls, language))
|
|
Translation.get_sources(trans_args)
|
|
|
|
encoder = PYSONEncoder()
|
|
decoder = PYSONDecoder(noeval=True)
|
|
|
|
accesses = FieldAccess.get_access([cls.__name__])[cls.__name__]
|
|
for fname, field in cls._fields.items():
|
|
if fields_names and fname not in fields_names:
|
|
continue
|
|
definition[fname] = field.definition(cls, language)
|
|
if not accesses.get(fname, {}).get('write', True):
|
|
definition[fname]['readonly'] = True
|
|
states = decoder.decode(definition[fname]['states'])
|
|
states.pop('readonly', None)
|
|
definition[fname]['states'] = encoder.encode(states)
|
|
for right in ['create', 'delete']:
|
|
definition[fname][right] = accesses.get(
|
|
fname, {}).get(right, True)
|
|
if level > 0:
|
|
relation = definition[fname].get('relation')
|
|
if relation:
|
|
Relation = pool.get(relation)
|
|
relation_fields = Relation.fields_get(level=level - 1)
|
|
definition[fname]['relation_fields'] = relation_fields
|
|
for name, props in relation_fields.items():
|
|
# Convert selection into list
|
|
if isinstance(props.get('selection'), str):
|
|
change_with = props.get('selection_change_with')
|
|
if change_with:
|
|
selection = getattr(
|
|
Relation(), props['selection'])()
|
|
else:
|
|
selection = getattr(
|
|
Relation, props['selection'])()
|
|
props['selection'] = selection
|
|
schema = definition[fname].get('schema_model')
|
|
if schema:
|
|
Schema = pool.get(schema)
|
|
definition[fname]['relation_fields'] = (
|
|
Schema.get_relation_fields())
|
|
|
|
for fname in list(definition.keys()):
|
|
# filter out fields which aren't in the fields_names list
|
|
if fields_names:
|
|
if fname not in fields_names:
|
|
del definition[fname]
|
|
elif not ModelAccess.check_relation(
|
|
cls.__name__, fname, mode='read'):
|
|
del definition[fname]
|
|
return definition
|
|
|
|
def pre_validate(self):
|
|
pass
|
|
|
|
@classmethod
|
|
def __names__(cls, field=None, record=None):
|
|
pool = Pool()
|
|
IrModel = pool.get('ir.model')
|
|
IrModelField = pool.get('ir.model.field')
|
|
|
|
def format_value(value):
|
|
ffield = getattr(cls, field, None)
|
|
if isinstance(value, Model):
|
|
try:
|
|
return value.rec_name
|
|
except Exception:
|
|
return str(value.id)
|
|
elif ffield and ffield._type in {'selection', 'multiselection'}:
|
|
selection = ffield.get_selection(cls, field, record)
|
|
return ffield.get_selection_string(selection, value)
|
|
else:
|
|
return str(value)
|
|
|
|
names = {
|
|
'model': IrModel.get_name(cls.__name__),
|
|
}
|
|
if field:
|
|
names['field'] = IrModelField.get_name(cls.__name__, field)
|
|
|
|
if record:
|
|
try:
|
|
names['record'] = record.rec_name
|
|
except Exception:
|
|
names['record'] = record.id
|
|
if field:
|
|
value = getattr(record, field, None)
|
|
if isinstance(value, (tuple, list)):
|
|
value = ', '.join(map(format_value, value))
|
|
else:
|
|
value = format_value(value)
|
|
names['value'] = value
|
|
return names
|
|
|
|
def __init__(self, id=None, **kwargs):
|
|
super(Model, self).__init__()
|
|
if id is not None:
|
|
id = int(id)
|
|
self._id = id
|
|
self._deleted = self._removed = None
|
|
if kwargs:
|
|
self._values = self._record()
|
|
parent_values = defaultdict(dict)
|
|
has_context = {}
|
|
for name, value in kwargs.items():
|
|
if not name.startswith('_parent_'):
|
|
setattr(self, name, value)
|
|
else:
|
|
name, field = name.split('.', 1)
|
|
name = name[len('_parent_'):]
|
|
parent_values[name][field] = value
|
|
value = parent_values[name]
|
|
if getattr(self.__class__, name).context:
|
|
has_context[name] = value
|
|
|
|
for name, value in parent_values.items():
|
|
setattr(self, name, value)
|
|
# Set field with context a second times
|
|
# to ensure it was evaluated with all the fields
|
|
for name, value in has_context.items():
|
|
setattr(self, name, value)
|
|
self._init_values = self._values._copy()
|
|
else:
|
|
self._values = None
|
|
self._init_values = None
|
|
|
|
def __copy__(self):
|
|
copied = self.__class__(self.id)
|
|
copied._values = copy.copy(self._values)
|
|
copied._init_values = copy.copy(self._init_values)
|
|
return copied
|
|
|
|
def __getattr__(self, name):
|
|
if name.startswith('__') and name.endswith('__'):
|
|
raise AttributeError
|
|
try:
|
|
return self._values[name]
|
|
except (KeyError, TypeError):
|
|
raise AttributeError("'%s' Model has no attribute '%s': %s"
|
|
% (self.__name__, name, self._values))
|
|
|
|
def __contains__(self, name):
|
|
return name in self._fields
|
|
|
|
def __int__(self):
|
|
return int(self.id)
|
|
|
|
def __str__(self):
|
|
return '%s,%s' % (self.__name__, self.id)
|
|
|
|
def __repr__(self):
|
|
if self.id is None or self.id < 0:
|
|
return "Pool().get('%s')(**%s)" % (self.__name__,
|
|
repr(self._default_values))
|
|
else:
|
|
return "Pool().get('%s')(%s)" % (self.__name__, self.id)
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, Model):
|
|
return NotImplemented
|
|
elif self.id is None or other.id is None:
|
|
return id(self) == id(other)
|
|
return (self.__name__, self.id) == (other.__name__, other.id)
|
|
|
|
def __lt__(self, other):
|
|
if not isinstance(other, Model) or self.__name__ != other.__name__:
|
|
return NotImplemented
|
|
return self.id < other.id
|
|
|
|
def __ne__(self, other):
|
|
return not self == other
|
|
|
|
def __hash__(self):
|
|
return hash((self.__name__, id(self) if self.id is None else self.id))
|
|
|
|
def __bool__(self):
|
|
return True
|
|
|
|
@property
|
|
def _default_values(self):
|
|
"""Return the values not stored.
|
|
By default, the value of a field is its internal representation except:
|
|
- for Many2One and One2One field: the id
|
|
- for Reference field: the string model,id
|
|
- for Many2Many: the list of ids
|
|
- for One2Many: the list of `_default_values`
|
|
"""
|
|
values = {}
|
|
if self._values:
|
|
for fname, value in self._values._items():
|
|
field = self._fields[fname]
|
|
if field._type in ('many2one', 'one2one', 'reference'):
|
|
if value:
|
|
if field._type == 'reference':
|
|
value = str(value)
|
|
else:
|
|
value = value.id
|
|
elif field._type in ('one2many', 'many2many'):
|
|
if field._type == 'one2many':
|
|
value = [r._default_values for r in value]
|
|
else:
|
|
value = [r.id for r in value]
|
|
values[fname] = value
|
|
return values
|
|
|
|
|
|
def record(name, field_names):
|
|
def __init__(self, **kwargs):
|
|
for key, value in kwargs.items():
|
|
setattr(self, key, value)
|
|
|
|
def _getitem(self, field):
|
|
try:
|
|
return getattr(self, field)
|
|
except AttributeError:
|
|
raise KeyError(field)
|
|
|
|
def _setitem(self, field, value):
|
|
try:
|
|
return setattr(self, field, value)
|
|
except AttributeError:
|
|
raise KeyError(field)
|
|
|
|
def _contains(self, field):
|
|
try:
|
|
getattr(self, field)
|
|
return True
|
|
except AttributeError:
|
|
return False
|
|
|
|
def _clear(self):
|
|
for fname in self.__slots__:
|
|
try:
|
|
delattr(self, fname)
|
|
except AttributeError:
|
|
pass
|
|
|
|
def _copy(self):
|
|
return copy.copy(self)
|
|
|
|
def _get(self, field, default=None):
|
|
if field not in self.__slots__:
|
|
raise KeyError(field)
|
|
return getattr(self, field, default)
|
|
|
|
def _keys(self):
|
|
for fname in self.__slots__:
|
|
if hasattr(self, fname):
|
|
yield fname
|
|
|
|
def _items(self):
|
|
for fname in self.__slots__:
|
|
try:
|
|
yield fname, getattr(self, fname)
|
|
except AttributeError:
|
|
pass
|
|
|
|
_undefined = object()
|
|
|
|
def _pop(self, field, value=_undefined):
|
|
if field not in self.__slots__:
|
|
raise KeyError(field)
|
|
if value != _undefined:
|
|
value = getattr(self, field, value)
|
|
else:
|
|
try:
|
|
value = getattr(self, field)
|
|
except AttributeError:
|
|
raise KeyError(field)
|
|
try:
|
|
delattr(self, field)
|
|
except AttributeError:
|
|
pass
|
|
return value
|
|
|
|
def _popitem(self, field, value=_undefined):
|
|
return (field, self._pop(field, value=value))
|
|
|
|
def _setdefault(self, field, default=None):
|
|
try:
|
|
return getattr(self, field)
|
|
except AttributeError:
|
|
setattr(self, field, default)
|
|
return default
|
|
|
|
def _update(self, _other=None, **kwargs):
|
|
if isinstance(_other, collections.abc.Mapping):
|
|
_other = _other.items()
|
|
elif _other is None:
|
|
_other = []
|
|
chained = chain(_other, kwargs.items())
|
|
for key, value in chained:
|
|
setattr(self, key, value)
|
|
|
|
def _values(self):
|
|
for fname in self.__slots__:
|
|
try:
|
|
yield getattr(self, fname)
|
|
except AttributeError:
|
|
pass
|
|
|
|
field_names = set(field_names)
|
|
for fname in field_names:
|
|
if fname.startswith('_'):
|
|
raise ValueError(
|
|
"Field names cannot start with an underscore: %r" % name)
|
|
field_names = tuple(map(sys.intern, field_names))
|
|
type_dict = {
|
|
'__slots__': field_names,
|
|
'__init__': __init__,
|
|
'__getitem__': _getitem,
|
|
'__setitem__': _setitem,
|
|
'__contains__': _contains,
|
|
'_clear': _clear,
|
|
'_copy': _copy,
|
|
'_get': _get,
|
|
'_keys': _keys,
|
|
'_items': _items,
|
|
'_pop': _pop,
|
|
'_popitem': _popitem,
|
|
'_setdefault': _setdefault,
|
|
'_update': _update,
|
|
'_values': _values,
|
|
}
|
|
return type(name, (), type_dict)
|