# This file is part of Tryton. The COPYRIGHT file at the toplevel of this # repository contains the full copyright notices and license terms. import json from functools import partial from sql import Cast, CombiningQuery, Literal, Null, Select, operators from trytond import backend from trytond.pool import Pool from trytond.protocols.jsonrpc import JSONDecoder, JSONEncoder from trytond.tools import grouped_slice from trytond.tools.immutabledict import ImmutableDict from trytond.transaction import Transaction from .field import SQL_OPERATORS, Field, domain_method, order_method # Use canonical form dumps = partial( json.dumps, cls=JSONEncoder, separators=(',', ':'), sort_keys=True, ensure_ascii=False) class Dict(Field): 'Define dict field.' _type = 'dict' _sql_type = 'JSON' _py_type = dict def __init__(self, schema_model, string='', help='', required=False, readonly=False, domain=None, states=None, on_change=None, on_change_with=None, depends=None, context=None, loading='lazy'): super(Dict, self).__init__(string, help, required, readonly, domain, states, on_change, on_change_with, depends, context, loading) self.schema_model = schema_model self.search_unaccented = True def get(self, ids, model, name, values=None): dicts = dict((id, None) for id in ids) for value in values or []: data = value[name] if data: # If stored as JSON conversion is done on backend if isinstance(data, str): data = json.loads(data, object_hook=JSONDecoder()) for key, val in data.items(): if isinstance(val, list): data[key] = tuple(val) dicts[value['id']] = ImmutableDict(data) return dicts def sql_format(self, value): value = super().sql_format(value) if isinstance(value, dict): d = {} for k, v in value.items(): if v is None: continue if self.schema_model and isinstance(v, (list, tuple)): if not v: continue v = list(sorted(set(v))) d[k] = v value = dumps(d) return value def __set__(self, inst, value): if value: value = ImmutableDict(value) super().__set__(inst, value) def translated(self, name=None, type_='values'): "Return a descriptor for the translated value of the field" if name is None: name = self.name if name is None: raise ValueError('Missing name argument') return TranslatedDict(name, type_) def _domain_column(self, operator, column, key=None): database = Transaction().database column = database.json_get( super()._domain_column(operator, column), key) if operator.endswith('like'): column = Cast(column, database.sql_type('VARCHAR').base) if self.search_unaccented and operator.endswith('ilike'): column = database.unaccent(column) return column def _domain_value(self, operator, value): if backend.name == 'sqlite' and isinstance(value, bool): # json_extract returns 0 for JSON false and 1 for JSON true value = int(value) if isinstance(value, (Select, CombiningQuery)): return value if self.schema_model and isinstance(value, (list, tuple)): value = sorted(set(value)) if operator.endswith('in'): return [dumps(v) for v in value] else: value = dumps(value) if self.search_unaccented and operator.endswith('ilike'): database = Transaction().database value = database.unaccent(value) return value def _domain_add_null(self, column, operator, value, expression): expression = super()._domain_add_null( column, operator, value, expression) if value is None and operator.endswith('='): if operator == '=': expression |= (column == Null) else: expression &= (column != Null) return expression @domain_method def convert_domain(self, domain, tables, Model): name, operator, value = domain[:3] if '.' not in name: return super().convert_domain(domain, tables, Model) database = Transaction().database table, _ = tables[None] name, key = name.split('.', 1) Operator = SQL_OPERATORS[operator] raw_column = self.sql_column(table) column = self._domain_column(operator, raw_column, key) expression = Operator(column, self._domain_value(operator, value)) if operator in {'=', '!='}: # Try to use custom operators in case there is indexes try: if value is None: expression = database.json_key_exists( raw_column, key) if operator == '=': expression = operators.Not(expression) # we compare on multi-selection by doing an equality check and # not a contain check elif not isinstance(value, (list, tuple)): expression = database.json_contains( raw_column, dumps({key: value})) if operator == '!=': expression = operators.Not(expression) expression &= database.json_key_exists( raw_column, key) return expression except NotImplementedError: pass elif operator.endswith('in'): # Try to use custom operators in case there is indexes if not value: expression = Literal(operator.startswith('not')) else: op = '!=' if operator.startswith('not') else '=' try: in_expr = Literal(False) for v in value: in_expr |= database.json_contains( self._domain_column(op, raw_column, key), dumps(v)) if operator.startswith('not'): in_expr = ~in_expr expression = in_expr except NotImplementedError: pass expression = self._domain_add_null(column, operator, value, expression) return expression @order_method def convert_order(self, name, tables, Model): fname, _, key = name.partition('.') if not key: return super().convert_order(fname, tables, Model) database = Transaction().database table, _ = tables[None] column = self.sql_column(table) return [database.json_get(column, key)] def definition(self, model, language): definition = super().definition(model, language) definition['schema_model'] = self.schema_model return definition class TranslatedDict(object): 'A descriptor for translated values of Dict field' def __init__(self, name, type_): assert type_ in ['keys', 'values'] self.name = name self.type_ = type_ def __get__(self, inst, cls): if inst is None: return self pool = Pool() schema_model = getattr(cls, self.name).schema_model SchemaModel = pool.get(schema_model) value = getattr(inst, self.name) if not value: return value domain = [] if self.type_ == 'values': domain = [('type_', '=', 'selection')] records = [] for key_names in grouped_slice(value.keys()): records += SchemaModel.search([ ('name', 'in', key_names), ] + domain) keys = SchemaModel.get_keys(records) if self.type_ == 'keys': return {k['name']: k['string'] for k in keys} elif self.type_ == 'values': trans = {k['name']: dict(k['selection']) for k in keys} return {k: v if k not in trans else trans[k].get(v, v) for k, v in value.items()}