# This file is part of Tryton. The COPYRIGHT file at the top level of # this repository contains the full copyright notices and license terms. import warnings from functools import partial, wraps import sql from sql import ( Cast, Column, CombiningQuery, Expression, Literal, Null, Query, Select, operators) from sql.aggregate import Min from sql.conditionals import Coalesce, NullIf from sql.operators import Concat from trytond import backend from trytond.const import OPERATORS from trytond.pool import Pool from trytond.pyson import PYSON, Eval, PYSONDecoder, PYSONEncoder from trytond.rpc import RPC from trytond.tools import cached_property from trytond.tools.string_ import LazyString, StringPartitioned from trytond.transaction import Transaction _sql_version = tuple(map(int, sql.__version__.split('.'))) def domain_validate(value): assert isinstance(value, list), 'domain must be a list' def test_domain(dom): for arg in dom: if isinstance(arg, str): if arg not in ('AND', 'OR'): return False elif (isinstance(arg, tuple) or (isinstance(arg, list) and len(arg) > 2 and (( isinstance(arg[1], str) and arg[1] in OPERATORS) or ( isinstance(arg[1], PYSON) and arg[1].types() == {str})))): pass elif isinstance(arg, list): if not test_domain(arg): return False return True assert test_domain(value), 'invalid domain' def states_validate(value): assert isinstance(value, dict), 'states must be a dict' assert set(value).issubset({'required', 'readonly', 'invisible'}), ( 'extra keys "%(keys)s" in states' % { 'keys': set(value) - {'required', 'readonly', 'invisible'}, }) for state in value: assert isinstance(value[state], (bool, PYSON)), \ 'values of states must be PYSON' if hasattr(value[state], 'types'): assert value[state].types() == {bool}, \ 'values of states must return boolean' def depends_validate(value): assert isinstance(value, set), 'depends must be a set' def context_validate(value): assert isinstance(value, dict), 'context must be a dict' def size_validate(value): if value is not None: assert isinstance(value, (int, PYSON)), 'size must be PYSON' if hasattr(value, 'types'): assert value.types() <= {int, type(None)}, \ 'size must return integer' def search_order_validate(value): if value is not None: assert isinstance(value, (list, PYSON)), 'search_order must be PYSON' if hasattr(value, 'types'): assert value.types() == {list}, 'search_order must be PYSON' def _set_value(record, field): try: field, nested = field.split('.', 1) except ValueError: nested = None if field.startswith('_parent_'): field = field[8:] # Strip '_parent_' if not hasattr(record, field): default = None if hasattr(record, '_defaults') and field in record._defaults: default = record._defaults[field]() setattr(record, field, default) elif nested: parent = getattr(record, field) if parent: _set_value(parent, nested) def depends(*fields, **kwargs): methods = kwargs.pop('methods', None) assert not kwargs def decorator(func): depends = getattr(func, 'depends', set()) depends.update(fields) setattr(func, 'depends', depends) if methods: depend_methods = getattr(func, 'depend_methods', set()) depend_methods.update(methods) setattr(func, 'depend_methods', depend_methods) @wraps(func) def wrapper(self, *args, **kwargs): for field in fields: _set_value(self, field) return func(self, *args, **kwargs) return wrapper return decorator def _iter_eval_fields(value): "Iterate over evaluated fields" if isinstance(value, Eval): yield value.basename elif isinstance(value, PYSON): yield from _iter_eval_fields(value.pyson()) elif isinstance(value, (list, tuple)): for val in value: yield from _iter_eval_fields(val) elif isinstance(value, dict): for val in value.values(): yield from _iter_eval_fields(val) def get_eval_fields(value): "Return fields evaluated" fields = set(_iter_eval_fields(value)) fields.discard('context') # TODO: remove when context is renamed return fields def instanciate_values(Target, value, **extra): from ..modelstorage import ModelStorage, local_cache kwargs = {} ids = [] if issubclass(Target, ModelStorage): kwargs['_local_cache'] = local_cache(Target) kwargs['_ids'] = ids def instance(data): if isinstance(data, Target): for k, v in extra.items(): setattr(data, k, v) return data elif isinstance(data, dict): if data.get('id', -1) >= 0: values = {} values.update(data) values.update(kwargs) ids.append(data['id']) else: values = data values.update(extra) return Target(**values) else: ids.append(data) return Target(data, **extra, **kwargs) return tuple(instance(x) for x in (value or [])) def instantiate_context(field, record): from ..modelstorage import EvalEnvironment ctx = {} if field.context: pyson_context = PYSONEncoder().encode(field.context) ctx.update(PYSONDecoder( EvalEnvironment(record, record.__class__)).decode( pyson_context)) datetime_ = None if getattr(field, 'datetime_field', None): datetime_ = getattr(record, field.datetime_field, None) ctx = {'_datetime': datetime_} return ctx def on_change_result(record): return record._changed_values def on_change_with_result(fieldname): def decorator(func): @wraps(func) def wrapper(self, *args, **kwargs): value = func(self, *args, **kwargs) setattr(self, fieldname, value) return self._changed_values return wrapper return decorator def domain_method(func): @wraps(func) def wrapper(self, domain, tables, Model): name = domain[0].split('.', 1)[0] assert name == self.name method = getattr(Model, f'domain_{name}', None) if method: return method(domain, tables) return func(self, domain, tables, Model) return wrapper def order_method(func): @wraps(func) def wrapper(self, name, tables, Model): fname, _, oexpr = name.partition('.') assert fname == self.name method = getattr(Model, f'order_{fname}', None) if not oexpr and method: return method(tables) else: return func(self, name, tables, Model) return wrapper SQL_OPERATORS = { '=': operators.Equal, '!=': operators.NotEqual, 'like': partial(operators.Like, escape='\\'), 'not like': partial(operators.NotLike, escape='\\'), 'ilike': partial(operators.ILike, escape='\\'), 'not ilike': partial(operators.NotILike, escape='\\'), 'in': operators.In, 'not in': operators.NotIn, '<=': operators.LessEqual, '>=': operators.GreaterEqual, '<': operators.Less, '>': operators.Greater, } class Field(object): _type = None _sql_type = None _py_type = None def __init__(self, string='', help='', required=False, readonly=False, domain=None, states=None, on_change=None, on_change_with=None, depends=None, context=None, loading='eager'): ''' :param string: A string for label of the field. :param help: A multi-line help string. :param required: A boolean if ``True`` the field is required. :param readonly: A boolean if ``True`` the field is not editable in the user interface. :param domain: A list that defines a domain constraint. :param states: A dictionary. Possible keys are ``required``, ``readonly`` and ``invisible``. Values are pyson expressions that will be evaluated with record values. This allows to change dynamically the attributes of the field. :param on_change: A list of values. If set, the client will call the method ``on_change_`` when the user changes the field value. It then passes this list of values as arguments to the function. :param on_change_with: A list of values. Like ``on_change``, but defined the other way around. The list contains all the fields that must update the current field. :param depends: A set of field name on which this one depends. :param context: A dictionary which will be given to open the relation fields. :param loading: Define how the field must be loaded: ``lazy`` or ``eager``. ''' if not isinstance(string, LazyString): assert string, 'a string is required' self.string = string self.help = help self.required = required self.readonly = readonly self.__domain = None self.domain = domain self.__states = None self.states = states or {} self.on_change = set() if on_change: warnings.warn('on_change argument is deprecated, ' 'use the depends decorator', DeprecationWarning, stacklevel=3) self.on_change.update(on_change) self.on_change_with = set() if on_change_with: warnings.warn('on_change_with argument is deprecated, ' 'use the depends decorator', DeprecationWarning, stacklevel=3) self.on_change_with.update(on_change_with) self.__depends = None self.depends = depends or set() self.__context = None self.context = context or {} assert loading in ('lazy', 'eager'), \ 'loading must be "lazy" or "eager"' self.loading = loading self.name = None @property def string(self): return self.__string @string.setter def string(self, value): self.__string = StringPartitioned(value) @property def help(self): return self.__help @help.setter def help(self, value): self.__help = StringPartitioned(value) def _get_domain(self): return self.__domain def _set_domain(self, value): if value is None: value = [] domain_validate(value) self.__domain = value domain = property(_get_domain, _set_domain) def _get_states(self): return self.__states def _set_states(self, value): states_validate(value) self.__states = value states = property(_get_states, _set_states) def _get_depends(self): return self.__depends def _set_depends(self, value): value = set(value) depends_validate(value) self.__depends = value depends = property(_get_depends, _set_depends) @cached_property def display_depends(self): depends = get_eval_fields(self.states.get('invisible')) return self.depends | depends @cached_property def edition_depends(self): depends = get_eval_fields(self.domain) depends |= get_eval_fields(self.states.get('readonly')) depends |= get_eval_fields(self.states.get('required')) return self.depends | depends @cached_property def validation_depends(self): depends = get_eval_fields(self.domain) depends |= get_eval_fields(self.states.get('required')) return self.depends | depends def _get_context(self): return self.__context def _set_context(self, value): context_validate(value) self.__context = value context = property(_get_context, _set_context) def __get__(self, inst, cls): if inst is None: return self assert self.name is not None if self.name == 'id': return inst._id return inst.__getattr__(self.name) def __set__(self, inst, value): assert self.name is not None if isinstance(value, (Query, Expression)): raise ValueError("Can not assign SQL") if inst._values is None: inst._values = inst._record() if (self._py_type and value is not None and not isinstance(value, self._py_type)): value = self._py_type(value) inst._values[self.name] = value def sql_format(self, value): if isinstance(value, (Query, Expression)): return value assert self._sql_type is not None database = Transaction().database if (self._py_type and value is not None and not isinstance(value, self._py_type)): value = self._py_type(value) return database.sql_format(self._sql_type, value) def sql_type(self): database = Transaction().database return database.sql_type(self._sql_type) def sql_cast(self, expression): return Cast(expression, self.sql_type().base) def sql_column(self, table): return Column(table, self.name) def _domain_column(self, operator, column): return column def _domain_value(self, operator, value): if isinstance(value, (Select, CombiningQuery)): return value if operator in ('in', 'not in'): return [self.sql_format(v) for v in value if v is not None] else: return self.sql_format(value) def _domain_add_null(self, column, operator, value, expression): if operator in ('in', 'not in'): if (not isinstance(value, (Select, CombiningQuery)) and any(v is None for v in value)): if operator == 'in': expression |= (column == Null) else: expression &= (column != Null) return expression @domain_method def convert_domain(self, domain, tables, Model): "Return a SQL expression for the domain using tables" table, _ = tables[None] name, operator, value = domain Operator = SQL_OPERATORS[operator] column = self.sql_column(table) column = self._domain_column(operator, column) expression = Operator(column, self._domain_value(operator, value)) if isinstance(expression, operators.In) and not expression.right: expression = Literal(False) elif isinstance(expression, operators.NotIn) and not expression.right: expression = Literal(True) expression = self._domain_add_null(column, operator, value, expression) return expression @order_method def convert_order(self, name, tables, Model): "Return a SQL expression to order" table, _ = tables[None] return [self.sql_column(table)] def set_rpc(self, model): for attribute, decorator, result in ( ('on_change', None, on_change_result), ('on_change_with', on_change_with_result(self.name), None), ): if not getattr(self, attribute): continue func_name = '%s_%s' % (attribute, self.name) assert hasattr(model, func_name), \ 'Missing %s on model %s' % (func_name, model.__name__) model.__rpc__.setdefault( func_name, RPC(instantiate=0, decorator=decorator, result=result)) def definition(self, model, language): pool = Pool() Translation = pool.get('ir.translation') encoder = PYSONEncoder() definition = { 'context': encoder.encode(self.context), 'loading': self.loading, 'name': self.name, 'on_change': list(self.on_change), 'on_change_with': list(self.on_change_with), 'readonly': self.readonly, 'required': self.required, 'states': encoder.encode(self.states), 'type': self._type, 'domain': encoder.encode(self.domain), 'searchable': self.searchable(model), 'sortable': self.sortable(model), } # Add id to on_change's if they are not cached # Not having the id increase the efficiency of the cache for method in ['on_change', 'on_change_with']: changes = definition[method] if changes: method_name = method + '_' + self.name if not model.__rpc__[method_name].cache: changes.append('id') for name in changes: target = model if '.' in name: prefix, _ = name.rsplit('.', 1) prefix += '.' else: prefix = '' while name.startswith('_parent_'): field, name = name.split('.', 1) target = target._fields[field[8:]].get_target() field = target._fields[name] if field and field.context: eval_fields = get_eval_fields(field.context) for context_field_name in eval_fields: prefix_ctx_field_name = ( prefix + context_field_name) if (context_field_name in field.depends and prefix_ctx_field_name not in changes): changes.append(prefix_ctx_field_name) name = '%s,%s' % (model.__name__, self.name) for attr, ttype in [('string', 'field'), ('help', 'help')]: definition[attr] = '' for source in getattr(self, attr): if not isinstance(source, LazyString): source = ( Translation.get_source(name, ttype, language, source) or source) definition[attr] += source return definition def definition_translations(self, model, language): "Returns sources used for definition" name = '%s,%s' % (model.__name__, self.name) translations = [] for attr, ttype in [('string', 'field'), ('help', 'help')]: for source in getattr(self, attr): if not isinstance(source, LazyString): translations.append((name, ttype, language, source)) return translations def searchable(self, model): return hasattr(model, 'search') def sortable(self, model): return hasattr(model, 'search') class FieldTranslate(Field): def _get_translation_join( self, Model, name, translation, table, from_, language): if Model.__name__ == 'ir.model.field': pool = Pool() ModelData = pool.get('ir.model.data') ModelField = pool.get('ir.model.field') Translation = pool.get('ir.translation') model_data = ModelData.__table__() model_field = ModelField.__table__() msg_trans = Translation.__table__() if name == 'field_description': type_ = 'field' else: type_ = 'help' translation = translation.select( translation.id.as_('id'), translation.res_id.as_('res_id'), translation.value.as_('value'), translation.name.as_('name'), translation.lang.as_('lang'), translation.type.as_('type'), translation.fuzzy.as_('fuzzy'), ) translation |= (msg_trans .join(model_data, condition=(msg_trans.res_id == model_data.db_id) & (model_data.model == 'ir.message') & (msg_trans.name == 'ir.message,text')) .join(model_field, condition=Concat( Concat(model_data.module, '.'), model_data.fs_id) == getattr(model_field, name)) .select( msg_trans.id.as_('id'), Literal(-1).as_('res_id'), msg_trans.value.as_('value'), Concat( Concat(model_field.model, ','), model_field.name).as_('name'), msg_trans.lang.as_('lang'), Literal(type_).as_('type'), msg_trans.fuzzy.as_('fuzzy'), )) if backend.name == 'postgresql' and _sql_version >= (1, 1, 0): query = translation.select( translation.res_id.as_('res_id'), translation.value.as_('value'), translation.name.as_('name'), distinct=True, distinct_on=[translation.res_id, translation.name], order_by=[ translation.res_id, translation.name, translation.id.desc]) else: query = translation.select( translation.res_id.as_('res_id'), Min(translation.value).as_('value'), translation.name.as_('name'), group_by=[translation.res_id, translation.name]) if Model.__name__ == 'ir.model': name_ = Concat(Concat(table.model, ','), name) type_ = 'model' res_id = -1 elif Model.__name__ == 'ir.model.field': name_ = Concat(Concat(table.model, ','), table.name) if name == 'field_description': type_ = 'field' else: type_ = 'help' res_id = -1 else: name_ = '%s,%s' % (Model.__name__, name) type_ = 'model' res_id = table.id query.where = ( (translation.lang == language) & (translation.type == type_) & (translation.fuzzy == Literal(False)) ) return query, from_.join(query, 'LEFT', condition=(query.res_id == res_id) & (query.name == name_)) def _get_translation_column(self, Model, name): from trytond.ir.lang import get_parent_language pool = Pool() Translation = pool.get('ir.translation') table = join = Model.__table__() language = Transaction().language column = None while language: translation = Translation.__table__() translation, join = self._get_translation_join( Model, name, translation, table, join, language) column = Coalesce(NullIf(column, ''), translation.value) language = get_parent_language(language) return table, join, column @domain_method def convert_domain(self, domain, tables, Model): if not self.translate: return super(FieldTranslate, self).convert_domain( domain, tables, Model) table, _ = tables[None] name, operator, value = domain model, join, column = self._get_translation_column(Model, name) column = Coalesce(NullIf(column, ''), self.sql_column(model)) column = self._domain_column(operator, column) Operator = SQL_OPERATORS[operator] assert name == self.name where = Operator(column, self._domain_value(operator, value)) if isinstance(where, operators.In) and not where.right: where = Literal(False) elif isinstance(where, operators.NotIn) and not where.right: where = Literal(True) where = self._domain_add_null(column, operator, value, where) return table.id.in_(join.select(model.id, where=where)) def _get_translation_order(self, tables, Model, name): from trytond.ir.lang import get_parent_language pool = Pool() Translation = pool.get('ir.translation') table, _ = tables[None] join = table language = Transaction().language column = None while language: key = name + '.translation-' + language if key not in tables: translation = Translation.__table__() translation, join = self._get_translation_join( Model, name, translation, table, table, language) if join.left == table: tables[key] = { None: (join.right, join.condition), } else: tables[key] = { None: (join.left.right, join.left.condition), 'translation': { None: (join.right, join.condition), }, } else: if 'translation' not in tables[key]: translation, _ = tables[key][None] else: translation, _ = tables[key]['translation'][None] column = Coalesce(NullIf(column, ''), translation.value) language = get_parent_language(language) return column @order_method def convert_order(self, name, tables, Model): if not self.translate: return super().convert_order(name, tables, Model) assert name == self.name table, _ = tables[None] column = self._get_translation_order(tables, Model, name) return [Coalesce(NullIf(column, ''), self.sql_column(table))] def definition(self, model, language): definition = super().definition(model, language) definition['translate'] = self.translate return definition