Files
tradon/model/fields/dict.py
2025-12-26 13:11:43 +00:00

222 lines
8.2 KiB
Python
Executable File

# This file is part of Tryton. The COPYRIGHT file at the toplevel of this
# repository contains the full copyright notices and license terms.
import json
from functools import partial
from sql import Cast, CombiningQuery, Literal, Null, Select, operators
from trytond import backend
from trytond.pool import Pool
from trytond.protocols.jsonrpc import JSONDecoder, JSONEncoder
from trytond.tools import grouped_slice
from trytond.tools.immutabledict import ImmutableDict
from trytond.transaction import Transaction
from .field import SQL_OPERATORS, Field, domain_method, order_method
# Use canonical form
dumps = partial(
json.dumps, cls=JSONEncoder, separators=(',', ':'), sort_keys=True,
ensure_ascii=False)
class Dict(Field):
'Define dict field.'
_type = 'dict'
_sql_type = 'JSON'
_py_type = dict
def __init__(self, schema_model, string='', help='', required=False,
readonly=False, domain=None, states=None,
on_change=None, on_change_with=None, depends=None,
context=None, loading='lazy'):
super(Dict, self).__init__(string, help, required, readonly, domain,
states, on_change, on_change_with, depends, context, loading)
self.schema_model = schema_model
self.search_unaccented = True
def get(self, ids, model, name, values=None):
dicts = dict((id, None) for id in ids)
for value in values or []:
data = value[name]
if data:
# If stored as JSON conversion is done on backend
if isinstance(data, str):
data = json.loads(data, object_hook=JSONDecoder())
for key, val in data.items():
if isinstance(val, list):
data[key] = tuple(val)
dicts[value['id']] = ImmutableDict(data)
return dicts
def sql_format(self, value):
value = super().sql_format(value)
if isinstance(value, dict):
d = {}
for k, v in value.items():
if v is None:
continue
if self.schema_model and isinstance(v, (list, tuple)):
if not v:
continue
v = list(sorted(set(v)))
d[k] = v
value = dumps(d)
return value
def __set__(self, inst, value):
if value:
value = ImmutableDict(value)
super().__set__(inst, value)
def translated(self, name=None, type_='values'):
"Return a descriptor for the translated value of the field"
if name is None:
name = self.name
if name is None:
raise ValueError('Missing name argument')
return TranslatedDict(name, type_)
def _domain_column(self, operator, column, key=None):
database = Transaction().database
column = database.json_get(
super()._domain_column(operator, column), key)
if operator.endswith('like'):
column = Cast(column, database.sql_type('VARCHAR').base)
if self.search_unaccented and operator.endswith('ilike'):
column = database.unaccent(column)
return column
def _domain_value(self, operator, value):
if backend.name == 'sqlite' and isinstance(value, bool):
# json_extract returns 0 for JSON false and 1 for JSON true
value = int(value)
if isinstance(value, (Select, CombiningQuery)):
return value
if self.schema_model and isinstance(value, (list, tuple)):
value = sorted(set(value))
if operator.endswith('in'):
return [dumps(v) for v in value]
else:
value = dumps(value)
if self.search_unaccented and operator.endswith('ilike'):
database = Transaction().database
value = database.unaccent(value)
return value
def _domain_add_null(self, column, operator, value, expression):
expression = super()._domain_add_null(
column, operator, value, expression)
if value is None and operator.endswith('='):
if operator == '=':
expression |= (column == Null)
else:
expression &= (column != Null)
return expression
@domain_method
def convert_domain(self, domain, tables, Model):
name, operator, value = domain[:3]
if '.' not in name:
return super().convert_domain(domain, tables, Model)
database = Transaction().database
table, _ = tables[None]
name, key = name.split('.', 1)
Operator = SQL_OPERATORS[operator]
raw_column = self.sql_column(table)
column = self._domain_column(operator, raw_column, key)
expression = Operator(column, self._domain_value(operator, value))
if operator in {'=', '!='}:
# Try to use custom operators in case there is indexes
try:
if value is None:
expression = database.json_key_exists(
raw_column, key)
if operator == '=':
expression = operators.Not(expression)
# we compare on multi-selection by doing an equality check and
# not a contain check
elif not isinstance(value, (list, tuple)):
expression = database.json_contains(
raw_column, dumps({key: value}))
if operator == '!=':
expression = operators.Not(expression)
expression &= database.json_key_exists(
raw_column, key)
return expression
except NotImplementedError:
pass
elif operator.endswith('in'):
# Try to use custom operators in case there is indexes
if not value:
expression = Literal(operator.startswith('not'))
else:
op = '!=' if operator.startswith('not') else '='
try:
in_expr = Literal(False)
for v in value:
in_expr |= database.json_contains(
self._domain_column(op, raw_column, key),
dumps(v))
if operator.startswith('not'):
in_expr = ~in_expr
expression = in_expr
except NotImplementedError:
pass
expression = self._domain_add_null(column, operator, value, expression)
return expression
@order_method
def convert_order(self, name, tables, Model):
fname, _, key = name.partition('.')
if not key:
return super().convert_order(fname, tables, Model)
database = Transaction().database
table, _ = tables[None]
column = self.sql_column(table)
return [database.json_get(column, key)]
def definition(self, model, language):
definition = super().definition(model, language)
definition['schema_model'] = self.schema_model
return definition
class TranslatedDict(object):
'A descriptor for translated values of Dict field'
def __init__(self, name, type_):
assert type_ in ['keys', 'values']
self.name = name
self.type_ = type_
def __get__(self, inst, cls):
if inst is None:
return self
pool = Pool()
schema_model = getattr(cls, self.name).schema_model
SchemaModel = pool.get(schema_model)
value = getattr(inst, self.name)
if not value:
return value
domain = []
if self.type_ == 'values':
domain = [('type_', '=', 'selection')]
records = []
for key_names in grouped_slice(value.keys()):
records += SchemaModel.search([
('name', 'in', key_names),
] + domain)
keys = SchemaModel.get_keys(records)
if self.type_ == 'keys':
return {k['name']: k['string'] for k in keys}
elif self.type_ == 'values':
trans = {k['name']: dict(k['selection']) for k in keys}
return {k: v if k not in trans else trans[k].get(v, v)
for k, v in value.items()}