Files
tradon/model/model.py
2026-01-10 10:58:17 +01:00

523 lines
18 KiB
Python
Executable File

# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
import collections.abc
import copy
import sys
from collections import defaultdict
from functools import total_ordering
from itertools import chain
from trytond.i18n import lazy_gettext
from trytond.model import fields
from trytond.pool import Pool, PoolBase, PoolMeta
from trytond.pyson import PYSONDecoder, PYSONEncoder
from trytond.rpc import RPC
from trytond.transaction import Transaction
from trytond.url import URLMixin
__all__ = ['Model']
class ModelMeta(PoolMeta):
@property
def __queue__(self):
pool = Pool()
Queue = pool.get('ir.queue')
return Queue.caller(self)
@total_ordering
class Model(URLMixin, PoolBase, metaclass=ModelMeta):
"""
Define a model in Tryton.
"""
__slots__ = (
'_id', '_values', '_init_values', '_removed', '_deleted',
'__weakref__')
_rec_name = 'name'
id = fields.Integer(lazy_gettext('ir.msg_ID'), readonly=True)
@classmethod
def __setup__(cls):
super(Model, cls).__setup__()
cls.__rpc__ = {
'default_get': RPC(cache=dict(seconds=5 * 60)),
'fields_get': RPC(cache=dict(days=1)),
'pre_validate': RPC(instantiate=0),
}
cls.__access__ = set()
# Copy fields and update depends
for attr in dir(cls):
if attr.startswith('_'):
continue
if not isinstance(getattr(cls, attr), fields.Field):
continue
field_name = attr
field = getattr(cls, field_name)
# Copy the original field definition to prevent side-effect with
# the mutable attributes
for parent_cls in cls.__mro__:
parent_field = getattr(parent_cls, field_name, None)
if isinstance(parent_field, fields.Field):
field = parent_field
field = copy.deepcopy(field)
setattr(cls, field_name, field)
@classmethod
def __post_setup__(cls):
super(Model, cls).__post_setup__()
# Set _fields
cls._fields = {}
for attr in dir(cls):
if attr.startswith('_'):
continue
if isinstance(getattr(cls, attr), fields.Field):
cls._fields[attr] = getattr(cls, attr)
cls._record = record(cls.__name__ + '._record', cls._fields.keys())
# Set _defaults
cls._defaults = {}
fields_names = list(cls._fields.keys())
for field_name in fields_names:
default_method = getattr(cls, 'default_%s' % field_name, False)
if callable(default_method):
cls._defaults[field_name] = default_method
for k in cls._defaults:
assert k in cls._fields, \
'Default function defined in %s but field %s does not exist!' \
% (cls.__name__, k,)
# Set name to fields
for name, field in cls._fields.items():
if field.name is None:
field.name = name
else:
assert field.name == name, (
'Duplicate fields on %s: %s, %s'
% (cls, field.name, name))
@classmethod
def _get_name(cls):
if cls.__doc__ is None:
print("\n💥 MODELE SANS DOCSTRING :", cls.__name__, " (module:", cls.__module__, ")")
raise Exception("MODELE SANS DOCSTRING")
lines = cls.__doc__.splitlines()
if lines:
return lines[0]
return cls.__name__
# @classmethod
# def _get_name(cls):
# '''
# Returns the first non-empty line of the model docstring.
# '''
# assert cls.__doc__, '%s has no docstring' % cls
# lines = cls.__doc__.splitlines()
# for line in lines:
# line = line.strip()
# if line:
# return line
@classmethod
def __register__(cls, module_name):
"""
Add model in ir.model and ir.model.field.
"""
super(Model, cls).__register__(module_name)
pool = Pool()
Translation = pool.get('ir.translation')
Model_ = pool.get('ir.model')
ModelField = pool.get('ir.model.field')
model_id = Model_.register(cls, module_name)
ModelField.register(cls, module_name, model_id)
Translation.register_model(cls, module_name)
Translation.register_fields(cls, module_name)
@classmethod
def default_get(cls, fields_names, with_rec_name=True):
'''
Return a dict with the default values for each field in fields_names.
If with_rec_name is True, rec_name will be added.
'''
pool = Pool()
value = {}
context = Transaction().context
default_rec_name = context.get('default_rec_name')
if (default_rec_name
and cls._rec_name in cls._fields
and cls._rec_name in fields_names):
value[cls._rec_name] = default_rec_name
# get the default values defined in the object
for field_name in fields_names:
default_name = f'default_{field_name}'
if field_name in cls._fields and default_name in context:
value[field_name] = context.get(default_name)
elif field_name in cls._defaults:
value[field_name] = cls._defaults[field_name]()
field = cls._fields[field_name]
if (field._type == 'boolean'
and field_name not in value):
value[field_name] = False
if (with_rec_name
and field._type in ('many2one',)
and value.get(field_name)):
Target = pool.get(field.model_name)
if 'rec_name' in Target._fields:
value.setdefault(
field_name + '.', {})['rec_name'] = Target(
value[field_name]).rec_name
return value
@classmethod
def fields_get(cls, fields_names=None, level=0):
"""
Return the definition of each field on the model.
"""
definition = {}
pool = Pool()
Translation = pool.get('ir.translation')
FieldAccess = pool.get('ir.model.field.access')
ModelAccess = pool.get('ir.model.access')
# Add translation to cache
language = Transaction().language
trans_args = []
for fname, field in cls._fields.items():
if fields_names and fname not in fields_names:
continue
trans_args.extend(field.definition_translations(cls, language))
Translation.get_sources(trans_args)
encoder = PYSONEncoder()
decoder = PYSONDecoder(noeval=True)
accesses = FieldAccess.get_access([cls.__name__])[cls.__name__]
for fname, field in cls._fields.items():
if fields_names and fname not in fields_names:
continue
definition[fname] = field.definition(cls, language)
if not accesses.get(fname, {}).get('write', True):
definition[fname]['readonly'] = True
states = decoder.decode(definition[fname]['states'])
states.pop('readonly', None)
definition[fname]['states'] = encoder.encode(states)
for right in ['create', 'delete']:
definition[fname][right] = accesses.get(
fname, {}).get(right, True)
if level > 0:
relation = definition[fname].get('relation')
if relation:
Relation = pool.get(relation)
relation_fields = Relation.fields_get(level=level - 1)
definition[fname]['relation_fields'] = relation_fields
for name, props in relation_fields.items():
# Convert selection into list
if isinstance(props.get('selection'), str):
change_with = props.get('selection_change_with')
if change_with:
selection = getattr(
Relation(), props['selection'])()
else:
selection = getattr(
Relation, props['selection'])()
props['selection'] = selection
schema = definition[fname].get('schema_model')
if schema:
Schema = pool.get(schema)
definition[fname]['relation_fields'] = (
Schema.get_relation_fields())
for fname in list(definition.keys()):
# filter out fields which aren't in the fields_names list
if fields_names:
if fname not in fields_names:
del definition[fname]
elif not ModelAccess.check_relation(
cls.__name__, fname, mode='read'):
del definition[fname]
return definition
def pre_validate(self):
pass
@classmethod
def __names__(cls, field=None, record=None):
pool = Pool()
IrModel = pool.get('ir.model')
IrModelField = pool.get('ir.model.field')
def format_value(value):
ffield = getattr(cls, field, None)
if isinstance(value, Model):
try:
return value.rec_name
except Exception:
return str(value.id)
elif ffield and ffield._type in {'selection', 'multiselection'}:
selection = ffield.get_selection(cls, field, record)
return ffield.get_selection_string(selection, value)
else:
return str(value)
names = {
'model': IrModel.get_name(cls.__name__),
}
if field:
names['field'] = IrModelField.get_name(cls.__name__, field)
if record:
try:
names['record'] = record.rec_name
except Exception:
names['record'] = record.id
if field:
value = getattr(record, field, None)
if isinstance(value, (tuple, list)):
value = ', '.join(map(format_value, value))
else:
value = format_value(value)
names['value'] = value
return names
def __init__(self, id=None, **kwargs):
super(Model, self).__init__()
if id is not None:
id = int(id)
self._id = id
self._deleted = self._removed = None
if kwargs:
self._values = self._record()
parent_values = defaultdict(dict)
has_context = {}
for name, value in kwargs.items():
if not name.startswith('_parent_'):
setattr(self, name, value)
else:
name, field = name.split('.', 1)
name = name[len('_parent_'):]
parent_values[name][field] = value
value = parent_values[name]
if getattr(self.__class__, name).context:
has_context[name] = value
for name, value in parent_values.items():
setattr(self, name, value)
# Set field with context a second times
# to ensure it was evaluated with all the fields
for name, value in has_context.items():
setattr(self, name, value)
self._init_values = self._values._copy()
else:
self._values = None
self._init_values = None
def __copy__(self):
copied = self.__class__(self.id)
copied._values = copy.copy(self._values)
copied._init_values = copy.copy(self._init_values)
return copied
def __getattr__(self, name):
if name.startswith('__') and name.endswith('__'):
raise AttributeError
try:
return self._values[name]
except (KeyError, TypeError):
raise AttributeError("'%s' Model has no attribute '%s': %s"
% (self.__name__, name, self._values))
def __contains__(self, name):
return name in self._fields
def __int__(self):
return int(self.id)
def __str__(self):
return '%s,%s' % (self.__name__, self.id)
def __repr__(self):
if self.id is None or self.id < 0:
return "Pool().get('%s')(**%s)" % (self.__name__,
repr(self._default_values))
else:
return "Pool().get('%s')(%s)" % (self.__name__, self.id)
def __eq__(self, other):
if not isinstance(other, Model):
return NotImplemented
elif self.id is None or other.id is None:
return id(self) == id(other)
return (self.__name__, self.id) == (other.__name__, other.id)
def __lt__(self, other):
if not isinstance(other, Model) or self.__name__ != other.__name__:
return NotImplemented
return self.id < other.id
def __ne__(self, other):
return not self == other
def __hash__(self):
return hash((self.__name__, id(self) if self.id is None else self.id))
def __bool__(self):
return True
@property
def _default_values(self):
"""Return the values not stored.
By default, the value of a field is its internal representation except:
- for Many2One and One2One field: the id
- for Reference field: the string model,id
- for Many2Many: the list of ids
- for One2Many: the list of `_default_values`
"""
values = {}
if self._values:
for fname, value in self._values._items():
field = self._fields[fname]
if field._type in ('many2one', 'one2one', 'reference'):
if value:
if field._type == 'reference':
value = str(value)
else:
value = value.id
elif field._type in ('one2many', 'many2many'):
if field._type == 'one2many':
value = [r._default_values for r in value]
else:
value = [r.id for r in value]
values[fname] = value
return values
def record(name, field_names):
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
def _getitem(self, field):
try:
return getattr(self, field)
except AttributeError:
raise KeyError(field)
def _setitem(self, field, value):
try:
return setattr(self, field, value)
except AttributeError:
raise KeyError(field)
def _contains(self, field):
try:
getattr(self, field)
return True
except AttributeError:
return False
def _clear(self):
for fname in self.__slots__:
try:
delattr(self, fname)
except AttributeError:
pass
def _copy(self):
return copy.copy(self)
def _get(self, field, default=None):
if field not in self.__slots__:
raise KeyError(field)
return getattr(self, field, default)
def _keys(self):
for fname in self.__slots__:
if hasattr(self, fname):
yield fname
def _items(self):
for fname in self.__slots__:
try:
yield fname, getattr(self, fname)
except AttributeError:
pass
_undefined = object()
def _pop(self, field, value=_undefined):
if field not in self.__slots__:
raise KeyError(field)
if value != _undefined:
value = getattr(self, field, value)
else:
try:
value = getattr(self, field)
except AttributeError:
raise KeyError(field)
try:
delattr(self, field)
except AttributeError:
pass
return value
def _popitem(self, field, value=_undefined):
return (field, self._pop(field, value=value))
def _setdefault(self, field, default=None):
try:
return getattr(self, field)
except AttributeError:
setattr(self, field, default)
return default
def _update(self, _other=None, **kwargs):
if isinstance(_other, collections.abc.Mapping):
_other = _other.items()
elif _other is None:
_other = []
chained = chain(_other, kwargs.items())
for key, value in chained:
setattr(self, key, value)
def _values(self):
for fname in self.__slots__:
try:
yield getattr(self, fname)
except AttributeError:
pass
field_names = set(field_names)
for fname in field_names:
if fname.startswith('_'):
raise ValueError(
"Field names cannot start with an underscore: %r" % name)
field_names = tuple(map(sys.intern, field_names))
type_dict = {
'__slots__': field_names,
'__init__': __init__,
'__getitem__': _getitem,
'__setitem__': _setitem,
'__contains__': _contains,
'_clear': _clear,
'_copy': _copy,
'_get': _get,
'_keys': _keys,
'_items': _items,
'_pop': _pop,
'_popitem': _popitem,
'_setdefault': _setdefault,
'_update': _update,
'_values': _values,
}
return type(name, (), type_dict)