1247 lines
42 KiB
Python
Executable File
1247 lines
42 KiB
Python
Executable File
# This file is part of Tryton. The COPYRIGHT file at the top level of
|
|
# this repository contains the full copyright notices and license terms.
|
|
"User"
|
|
|
|
import copy
|
|
import datetime
|
|
import hashlib
|
|
import logging
|
|
import mmap
|
|
import random
|
|
import re
|
|
import string
|
|
import time
|
|
import uuid
|
|
|
|
try:
|
|
import secrets
|
|
except ImportError:
|
|
secrets = None
|
|
import ipaddress
|
|
import warnings
|
|
from ast import literal_eval
|
|
from functools import wraps
|
|
from itertools import groupby
|
|
from operator import attrgetter
|
|
|
|
from passlib.context import CryptContext
|
|
from sql import Literal, Null
|
|
from sql.aggregate import Count
|
|
from sql.conditionals import Case
|
|
from sql.functions import CurrentTimestamp
|
|
|
|
try:
|
|
import bcrypt
|
|
except ImportError:
|
|
bcrypt = None
|
|
try:
|
|
import argon2
|
|
except ImportError:
|
|
argon2 = None
|
|
|
|
from trytond.cache import Cache
|
|
from trytond.config import config
|
|
from trytond.exceptions import LoginException, RateLimitException, UserError
|
|
from trytond.i18n import gettext
|
|
from trytond.model import (
|
|
DeactivableMixin, Index, ModelSQL, ModelView, Unique, Workflow,
|
|
avatar_mixin, dualmethod, fields)
|
|
from trytond.model.exceptions import ValidationError
|
|
from trytond.pool import Pool
|
|
from trytond.pyson import Bool, Eval, PYSONEncoder
|
|
from trytond.report import Report, get_email
|
|
from trytond.rpc import RPC
|
|
from trytond.sendmail import send_message_transactional
|
|
from trytond.tools import grouped_slice
|
|
from trytond.tools.email_ import (
|
|
EmailNotValidError, normalize_email, set_from_header, validate_email)
|
|
from trytond.transaction import Transaction, without_check_access
|
|
from trytond.url import host, http_host
|
|
from trytond.wizard import Button, StateTransition, StateView, Wizard
|
|
|
|
logger = logging.getLogger(__name__)
|
|
_has_password = 'password' in re.split('[,+]', config.get(
|
|
'session', 'authentications', default='password'))
|
|
|
|
passlib_path = config.get('password', 'passlib')
|
|
if passlib_path:
|
|
CRYPT_CONTEXT = CryptContext.from_path(passlib_path)
|
|
else:
|
|
schemes = ['pbkdf2_sha512']
|
|
if bcrypt:
|
|
schemes.insert(0, 'bcrypt')
|
|
schemes.insert(0, 'scrypt')
|
|
if argon2:
|
|
schemes.insert(0, 'argon2')
|
|
CRYPT_CONTEXT = CryptContext(schemes=schemes)
|
|
|
|
|
|
def gen_password(length=8):
|
|
alphabet = string.ascii_letters + string.digits
|
|
if secrets:
|
|
choice = secrets.choice
|
|
else:
|
|
sysrand = random.SystemRandom()
|
|
choice = sysrand.choice
|
|
return ''.join(choice(alphabet) for _ in range(length))
|
|
|
|
|
|
def _send_email(from_, users, email_func):
|
|
from_cfg = config.get('email', 'from')
|
|
for user in users:
|
|
if not user.email:
|
|
logger.info("Missing address for '%s' to send email", user.login)
|
|
continue
|
|
msg, title = email_func(user)
|
|
set_from_header(msg, from_cfg, from_ or from_cfg)
|
|
msg['To'] = user.email
|
|
msg['Subject'] = title
|
|
send_message_transactional(msg)
|
|
|
|
|
|
class PasswordError(UserError):
|
|
pass
|
|
|
|
|
|
class DeleteError(UserError):
|
|
pass
|
|
|
|
|
|
class UserValidationError(ValidationError):
|
|
pass
|
|
|
|
|
|
class User(avatar_mixin(100, 'login'), DeactivableMixin, ModelSQL, ModelView):
|
|
"User"
|
|
__name__ = "res.user"
|
|
name = fields.Char('Name')
|
|
login = fields.Char('Login', required=True)
|
|
password_hash = fields.Char('Password Hash', strip=False)
|
|
password = fields.Function(fields.Char(
|
|
"Password",
|
|
states={
|
|
'invisible': not _has_password,
|
|
}),
|
|
getter='get_password', setter='set_password')
|
|
password_reset = fields.Char(
|
|
"Reset Password", strip=False,
|
|
states={
|
|
'invisible': not _has_password,
|
|
})
|
|
password_reset_expire = fields.Timestamp(
|
|
"Reset Password Expire",
|
|
states={
|
|
'required': Bool(Eval('password_reset')),
|
|
'invisible': not _has_password,
|
|
},
|
|
depends=['password_reset'])
|
|
signature = fields.Text('Signature')
|
|
menu = fields.Many2One('ir.action', 'Menu Action',
|
|
domain=[('usage', '=', 'menu')], required=True)
|
|
pyson_menu = fields.Function(fields.Char('PySON Menu'), 'get_pyson_menu')
|
|
actions = fields.Many2Many('res.user-ir.action', 'user', 'action',
|
|
'Actions', help='Actions that will be run at login.',
|
|
size=5)
|
|
groups = fields.Many2Many('res.user-res.group',
|
|
'user', 'group', 'Groups')
|
|
applications = fields.One2Many(
|
|
'res.user.application', 'user', "Applications")
|
|
language = fields.Many2One('ir.lang', 'Language',
|
|
domain=['OR',
|
|
('translatable', '=', True),
|
|
])
|
|
language_direction = fields.Function(fields.Char('Language Direction'),
|
|
'get_language_direction')
|
|
email = fields.Char('Email')
|
|
status_bar = fields.Function(fields.Char('Status Bar'), 'get_status_bar')
|
|
avatar_badge_url = fields.Function(
|
|
fields.Char("Avatar Badge URL"), 'get_avatar_badge_url')
|
|
warnings = fields.One2Many('res.user.warning', 'user', 'Warnings')
|
|
sessions = fields.Function(fields.Integer('Sessions'),
|
|
'get_sessions')
|
|
_get_groups_cache = Cache('res_user.get_groups', context=False)
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super(User, cls).__setup__()
|
|
cls.__rpc__.update({
|
|
'get_preferences': RPC(check_access=False),
|
|
'set_preferences': RPC(
|
|
readonly=False, check_access=False, fresh_session=True),
|
|
'get_preferences_fields_view': RPC(check_access=False),
|
|
})
|
|
table = cls.__table__()
|
|
cls._sql_constraints += [
|
|
('login_key', Unique(table, table.login),
|
|
'You can not have two users with the same login!')
|
|
]
|
|
cls._buttons.update({
|
|
'reset_password': {
|
|
'invisible': ~Eval('email', True) | (not _has_password),
|
|
},
|
|
})
|
|
cls._preferences_fields = [
|
|
'name',
|
|
'password',
|
|
'email',
|
|
'signature',
|
|
'menu',
|
|
'pyson_menu',
|
|
'actions',
|
|
'status_bar',
|
|
'avatar',
|
|
'avatar_url',
|
|
'avatar_badge_url',
|
|
'warnings',
|
|
'applications',
|
|
]
|
|
cls._context_fields = [
|
|
'language',
|
|
'language_direction',
|
|
'groups',
|
|
]
|
|
cls._order.insert(0, ('name', 'ASC'))
|
|
|
|
@classmethod
|
|
def __register__(cls, module_name):
|
|
pool = Pool()
|
|
ModelData = pool.get('ir.model.data')
|
|
model_data = ModelData.__table__()
|
|
cursor = Transaction().connection.cursor()
|
|
super(User, cls).__register__(module_name)
|
|
|
|
# Migration from 5.6: Set noupdate to admin
|
|
cursor.execute(*model_data.update(
|
|
[model_data.noupdate], [True],
|
|
where=(model_data.model == cls.__name__)
|
|
& (model_data.module == 'res')
|
|
& (model_data.fs_id == 'user_admin')))
|
|
|
|
@staticmethod
|
|
def default_menu():
|
|
pool = Pool()
|
|
Action = pool.get('ir.action')
|
|
actions = Action.search([
|
|
('usage', '=', 'menu'),
|
|
], limit=1)
|
|
if actions:
|
|
return actions[0].id
|
|
return None
|
|
|
|
def get_pyson_menu(self, name):
|
|
encoder = PYSONEncoder()
|
|
return encoder.encode(self.menu.get_action_value())
|
|
|
|
def get_language_direction(self, name):
|
|
pool = Pool()
|
|
Lang = pool.get('ir.lang')
|
|
if self.language:
|
|
return self.language.direction
|
|
else:
|
|
return Lang.default_direction()
|
|
|
|
def get_status_bar(self, name):
|
|
return self.name
|
|
|
|
def get_avatar_badge_url(self, name):
|
|
pass
|
|
|
|
def get_password(self, name):
|
|
return 'x' * 10
|
|
|
|
@classmethod
|
|
def set_password(cls, users, name, value):
|
|
if value == 'x' * 10:
|
|
return
|
|
|
|
if Transaction().user and value:
|
|
cls.validate_password(value, users)
|
|
|
|
to_write = []
|
|
for user in users:
|
|
to_write.extend([[user], {
|
|
'password_hash': cls.hash_password(value),
|
|
'password_reset': None,
|
|
'password_reset_expire': None,
|
|
}])
|
|
cls.write(*to_write)
|
|
|
|
@classmethod
|
|
def validate_password(cls, password, users):
|
|
password_b = password
|
|
if isinstance(password, str):
|
|
password_b = password.encode('utf-8')
|
|
length = config.getint('password', 'length', default=0)
|
|
if length > 0:
|
|
if len(password_b) < length:
|
|
raise PasswordError(gettext('res.msg_password_length',
|
|
length=length,
|
|
))
|
|
path = config.get('password', 'forbidden', default=None)
|
|
if path:
|
|
with open(path, 'r') as f:
|
|
forbidden = mmap.mmap(
|
|
f.fileno(), 0, access=mmap.ACCESS_READ)
|
|
if forbidden.find(password_b) >= 0:
|
|
raise PasswordError(gettext('res.msg_password_forbidden'))
|
|
for user in users:
|
|
# Use getattr to allow to use non User instances
|
|
for test, message in [
|
|
(getattr(user, 'name', ''), 'res.msg_password_name'),
|
|
(getattr(user, 'login', ''), 'res.msg_password_login'),
|
|
(getattr(user, 'email', ''), 'res.msg_password_email'),
|
|
]:
|
|
if test and password.lower() == test.lower():
|
|
raise PasswordError(gettext(message))
|
|
|
|
@dualmethod
|
|
@ModelView.button
|
|
def reset_password(cls, users, length=8, from_=None):
|
|
length = max(length, config.getint('password', 'length', default=0))
|
|
for user in users:
|
|
user.password_reset = gen_password(length=length)
|
|
user.password_reset_expire = (
|
|
datetime.datetime.now() + datetime.timedelta(
|
|
seconds=config.getint('password', 'reset_timeout')))
|
|
cls.save(users)
|
|
_send_email(from_, users, cls.get_email_reset_password)
|
|
|
|
def get_email_reset_password(self):
|
|
return get_email(
|
|
'res.user.email_reset_password', self, self.languages)
|
|
|
|
@property
|
|
def languages(self):
|
|
pool = Pool()
|
|
Lang = pool.get('ir.lang')
|
|
if self.language:
|
|
languages = [self.language]
|
|
else:
|
|
languages = Lang.search([
|
|
('code', '=', Transaction().language),
|
|
])
|
|
return languages
|
|
|
|
@staticmethod
|
|
def get_sessions(users, name):
|
|
Session = Pool().get('ir.session')
|
|
now = datetime.datetime.now()
|
|
timeout = datetime.timedelta(
|
|
seconds=config.getint('session', 'max_age'))
|
|
result = dict((u.id, 0) for u in users)
|
|
with without_check_access():
|
|
for sub_ids in grouped_slice(users):
|
|
sessions = Session.search([
|
|
('create_uid', 'in', sub_ids),
|
|
], order=[('create_uid', 'ASC')])
|
|
|
|
def filter_(session):
|
|
timestamp = session.write_date or session.create_date
|
|
return abs(timestamp - now) < timeout
|
|
result.update(dict((i, len(list(g)))
|
|
for i, g in groupby(filter(filter_, sessions),
|
|
attrgetter('create_uid.id'))))
|
|
return result
|
|
|
|
@staticmethod
|
|
def _convert_vals(vals):
|
|
vals = vals.copy()
|
|
pool = Pool()
|
|
Action = pool.get('ir.action')
|
|
if 'menu' in vals:
|
|
vals['menu'] = Action.get_action_id(vals['menu'])
|
|
if vals.get('email'):
|
|
vals['email'] = normalize_email(vals['email'])
|
|
return vals
|
|
|
|
@classmethod
|
|
def read(cls, ids, fields_names):
|
|
result = super(User, cls).read(ids, fields_names)
|
|
cache = Transaction().get_cache().get(cls.__name__)
|
|
for values in result:
|
|
if 'password_hash' in values:
|
|
values['password_hash'] = None
|
|
if cache and values['id'] in cache:
|
|
cache[values['id']]['password_hash'] = None
|
|
return result
|
|
|
|
@classmethod
|
|
def search(
|
|
cls, domain, offset=0, limit=None, order=None, count=False,
|
|
query=False):
|
|
users = super().search(
|
|
domain, offset=offset, limit=limit, order=order, count=count,
|
|
query=query)
|
|
if not count and not query:
|
|
cache = Transaction().get_cache().get(cls.__name__)
|
|
if cache is not None:
|
|
for user in users:
|
|
if user.id in cache:
|
|
cache[user.id]['password_hash'] = None
|
|
return users
|
|
|
|
@classmethod
|
|
def create(cls, vlist):
|
|
vlist = [cls._convert_vals(vals) for vals in vlist]
|
|
return super(User, cls).create(vlist)
|
|
|
|
@classmethod
|
|
def write(cls, users, values, *args):
|
|
pool = Pool()
|
|
Session = pool.get('ir.session')
|
|
UserDevice = pool.get('res.user.device')
|
|
|
|
actions = iter((users, values) + args)
|
|
all_users = []
|
|
session_to_clear = []
|
|
users_to_clear = []
|
|
args = []
|
|
for users, values in zip(actions, actions):
|
|
all_users += users
|
|
args.extend((users, cls._convert_vals(values)))
|
|
|
|
if values.keys() & {'active', 'password'}:
|
|
session_to_clear += users
|
|
users_to_clear += [u.login for u in users]
|
|
|
|
super(User, cls).write(*args)
|
|
|
|
Session.clear(session_to_clear)
|
|
UserDevice.clear(users_to_clear)
|
|
|
|
# Clean cursor cache as it could be filled by domain_get
|
|
for cache in Transaction().cache.values():
|
|
if cls.__name__ in cache:
|
|
for user in all_users:
|
|
cache[cls.__name__].pop(user.id, None)
|
|
|
|
@classmethod
|
|
def delete(cls, users):
|
|
raise DeleteError(gettext('res.msg_user_delete_forbidden'))
|
|
|
|
def get_rec_name(self, name):
|
|
return self.name if self.name else self.login
|
|
|
|
@classmethod
|
|
def search_rec_name(cls, name, clause):
|
|
if clause[1].startswith('!') or clause[1].startswith('not '):
|
|
bool_op = 'AND'
|
|
else:
|
|
bool_op = 'OR'
|
|
return [bool_op,
|
|
('login',) + tuple(clause[1:]),
|
|
('name',) + tuple(clause[1:]),
|
|
]
|
|
|
|
@classmethod
|
|
def copy(cls, users, default=None):
|
|
if default is None:
|
|
default = {}
|
|
default = default.copy()
|
|
|
|
default['password_hash'] = None
|
|
default['password_reset'] = None
|
|
default.setdefault('warnings')
|
|
default.setdefault('applications')
|
|
|
|
new_users = []
|
|
for user in users:
|
|
default['login'] = user.login + ' (copy)'
|
|
new_user, = super(User, cls).copy([user], default)
|
|
new_users.append(new_user)
|
|
return new_users
|
|
|
|
@classmethod
|
|
def validate_fields(cls, users, fields_names):
|
|
super().validate_fields(users, fields_names)
|
|
cls.check_valid_email(users, fields_names)
|
|
|
|
@classmethod
|
|
def check_valid_email(cls, users, fields_names=None):
|
|
if fields_names and 'email' not in fields_names:
|
|
return
|
|
for user in users:
|
|
if user.email:
|
|
try:
|
|
validate_email(user.email)
|
|
except EmailNotValidError as e:
|
|
raise UserValidationError(gettext(
|
|
'res.msg_email_invalid',
|
|
user=user.rec_name,
|
|
email=user.email),
|
|
str(e)) from e
|
|
|
|
@classmethod
|
|
def _get_preferences(cls, user, context_only=False):
|
|
pool = Pool()
|
|
ModelData = pool.get('ir.model.data')
|
|
Action = pool.get('ir.action')
|
|
Config = pool.get('ir.configuration')
|
|
ConfigItem = pool.get('ir.module.config_wizard.item')
|
|
Lang = pool.get('ir.lang')
|
|
|
|
res = {}
|
|
if context_only:
|
|
fields = cls._context_fields
|
|
else:
|
|
fields = cls._preferences_fields + cls._context_fields
|
|
for field in fields:
|
|
if cls._fields[field]._type in ('many2one',):
|
|
if field == 'language':
|
|
if user.language:
|
|
res['language'] = user.language.code
|
|
else:
|
|
res['language'] = Config.get_language()
|
|
else:
|
|
if getattr(user, field):
|
|
res[field] = getattr(user, field).id
|
|
res[field + '.rec_name'] = \
|
|
getattr(user, field).rec_name
|
|
elif cls._fields[field]._type in ('one2many', 'many2many'):
|
|
res[field] = [x.id for x in getattr(user, field)]
|
|
admin_id = ModelData.get_id('res.user_admin')
|
|
if field == 'actions' and user.id == admin_id:
|
|
config_wizard_id = ModelData.get_id('ir',
|
|
'act_module_config_wizard')
|
|
action_id = Action.get_action_id(config_wizard_id)
|
|
if action_id in res[field]:
|
|
res[field].remove(action_id)
|
|
if ConfigItem.search([
|
|
('state', '=', 'open'),
|
|
]):
|
|
res[field].insert(0, action_id)
|
|
else:
|
|
res[field] = getattr(user, field)
|
|
|
|
if user.language:
|
|
language = user.language
|
|
else:
|
|
try:
|
|
language = Lang.get(Config.get_language())
|
|
except ValueError:
|
|
language = None
|
|
if language:
|
|
date = language.date
|
|
for i, j in [('%a', ''), ('%A', ''), ('%b', '%m'), ('%B', '%m'),
|
|
('%j', ''), ('%U', ''), ('%w', ''), ('%W', '')]:
|
|
date = date.replace(i, j)
|
|
res['locale'] = {
|
|
'date': date,
|
|
'grouping': literal_eval(language.grouping),
|
|
'decimal_point': language.decimal_point,
|
|
'thousands_sep': language.thousands_sep,
|
|
'mon_grouping': literal_eval(language.mon_grouping),
|
|
'mon_decimal_point': language.mon_decimal_point,
|
|
'mon_thousands_sep': language.mon_thousands_sep,
|
|
'p_sign_posn': language.p_sign_posn,
|
|
'n_sign_posn': language.n_sign_posn,
|
|
'positive_sign': language.positive_sign,
|
|
'negative_sign': language.negative_sign,
|
|
'p_cs_precedes': language.p_cs_precedes,
|
|
'n_cs_precedes': language.n_cs_precedes,
|
|
'p_sep_by_space': language.p_sep_by_space,
|
|
'n_sep_by_space': language.n_sep_by_space,
|
|
}
|
|
return res
|
|
|
|
@classmethod
|
|
def get_preferences(cls, context_only=False):
|
|
user = Transaction().user
|
|
user = cls(user)
|
|
return cls._get_preferences(user, context_only=context_only)
|
|
|
|
@classmethod
|
|
def set_preferences(cls, values):
|
|
'''
|
|
Set user preferences
|
|
'''
|
|
pool = Pool()
|
|
Lang = pool.get('ir.lang')
|
|
values_clean = values.copy()
|
|
fields = cls._preferences_fields + cls._context_fields
|
|
user_id = Transaction().user
|
|
user = cls(user_id)
|
|
for field in values:
|
|
if field not in fields or field == 'groups':
|
|
del values_clean[field]
|
|
if field == 'language':
|
|
langs = Lang.search([
|
|
('code', '=', values['language']),
|
|
])
|
|
if langs:
|
|
values_clean['language'] = langs[0].id
|
|
else:
|
|
del values_clean['language']
|
|
# Set new context to write as validation could depend on it
|
|
context = {}
|
|
for name in cls._context_fields:
|
|
if name in values:
|
|
context[name] = values[name]
|
|
with Transaction().set_context(context):
|
|
cls.write([user], values_clean)
|
|
|
|
@classmethod
|
|
def get_preferences_fields_view(cls):
|
|
pool = Pool()
|
|
ModelData = pool.get('ir.model.data')
|
|
Lang = pool.get('ir.lang')
|
|
Action = pool.get('ir.action')
|
|
|
|
view_id = ModelData.get_id('res', 'user_view_form_preferences')
|
|
res = cls.fields_view_get(view_id=view_id)
|
|
res = copy.deepcopy(res)
|
|
for field in res['fields']:
|
|
if field not in ('groups', 'language_direction'):
|
|
res['fields'][field]['readonly'] = False
|
|
else:
|
|
res['fields'][field]['readonly'] = True
|
|
|
|
def convert2selection(definition, name):
|
|
del definition[name]['relation']
|
|
del definition[name]['domain']
|
|
definition[name]['type'] = 'selection'
|
|
selection = []
|
|
definition[name]['selection'] = selection
|
|
return selection
|
|
|
|
if 'language' in res['fields']:
|
|
selection = convert2selection(res['fields'], 'language')
|
|
langs = Lang.search(cls.language.domain)
|
|
lang_ids = [l.id for l in langs]
|
|
with Transaction().set_context(translate_name=True):
|
|
for lang in Lang.browse(lang_ids):
|
|
selection.append((lang.code, lang.name))
|
|
if 'action' in res['fields']:
|
|
selection = convert2selection(res['fields'], 'action')
|
|
selection.append((None, ''))
|
|
actions = Action.search([])
|
|
for action in actions:
|
|
selection.append((action.id, action.rec_name))
|
|
if 'menu' in res['fields']:
|
|
selection = convert2selection(res['fields'], 'menu')
|
|
actions = Action.search([
|
|
('usage', '=', 'menu'),
|
|
])
|
|
for action in actions:
|
|
selection.append((action.id, action.rec_name))
|
|
return res
|
|
|
|
@classmethod
|
|
def get_groups(cls):
|
|
'''
|
|
Return an ordered tuple of all group ids for the user
|
|
'''
|
|
pool = Pool()
|
|
UserGroup = pool.get('res.user-res.group')
|
|
transaction = Transaction()
|
|
user = transaction.user
|
|
groups = cls._get_groups_cache.get(user)
|
|
if groups is not None:
|
|
return groups
|
|
cursor = transaction.connection.cursor()
|
|
user_group = UserGroup.user_group_all_table()
|
|
cursor.execute(*user_group.select(
|
|
user_group.group,
|
|
where=user_group.user == user,
|
|
order_by=[user_group.group.asc]))
|
|
groups = tuple(g for g, in cursor)
|
|
cls._get_groups_cache.set(user, groups)
|
|
return groups
|
|
|
|
@classmethod
|
|
def _get_login(cls, login):
|
|
cursor = Transaction().connection.cursor()
|
|
table = cls.__table__()
|
|
cursor.execute(*table.select(table.id, table.password_hash,
|
|
Case(
|
|
(table.password_reset_expire > CurrentTimestamp(),
|
|
table.password_reset),
|
|
else_=None),
|
|
where=(table.login == login)
|
|
& (table.active == Literal(True))))
|
|
return cursor.fetchone() or (None, None, None)
|
|
|
|
@classmethod
|
|
def get_login(cls, login, parameters):
|
|
'''
|
|
Return user id if password matches
|
|
'''
|
|
pool = Pool()
|
|
LoginAttempt = pool.get('res.user.login.attempt')
|
|
UserDevice = pool.get('res.user.device')
|
|
parameters = parameters.copy()
|
|
|
|
count_ip = LoginAttempt.count_ip()
|
|
if count_ip > config.getint(
|
|
'session', 'max_attempt_ip_network', default=300):
|
|
# Do not add attempt as the goal is to prevent flooding
|
|
raise RateLimitException()
|
|
device_cookie = UserDevice.get_valid_cookie(
|
|
login, parameters.get('device_cookie'))
|
|
parameters['device_cookie'] = device_cookie
|
|
count = LoginAttempt.count(login, device_cookie)
|
|
if count > config.getint('session', 'max_attempt', default=5):
|
|
LoginAttempt.add(login, device_cookie)
|
|
raise RateLimitException()
|
|
Transaction().atexit(time.sleep, random.randint(0, 2 ** count - 1))
|
|
for methods in config.get(
|
|
'session', 'authentications', default='password').split(','):
|
|
user_ids = set()
|
|
for method in methods.split('+'):
|
|
if user_ids:
|
|
try:
|
|
method, options = method.split('?', 1)
|
|
except ValueError:
|
|
options = []
|
|
else:
|
|
options = options.split(':')
|
|
if cls._check_login_options(options, login, parameters):
|
|
continue
|
|
try:
|
|
func = getattr(cls, '_login_%s' % method)
|
|
except AttributeError:
|
|
logger.info('Missing login method: %s', method)
|
|
break
|
|
user_ids.add(func(login, parameters))
|
|
if len(user_ids) != 1 or not all(user_ids):
|
|
break
|
|
if len(user_ids) == 1 and all(user_ids):
|
|
LoginAttempt.remove(login, device_cookie)
|
|
return user_ids.pop()
|
|
LoginAttempt.add(login, device_cookie)
|
|
|
|
@classmethod
|
|
def _check_login_options(cls, options, login, parameters):
|
|
for option in options:
|
|
try:
|
|
func = getattr(cls, '_check_login_options_%s' % option)
|
|
except AttributeError:
|
|
logger.info("Missing login option: %s", option)
|
|
continue
|
|
if func(login, parameters):
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
@classmethod
|
|
def _check_login_options_ip_address(cls, login, parameters):
|
|
context = Transaction().context
|
|
if context.get('_request') and context['_request'].get('remote_addr'):
|
|
ip_address = ipaddress.ip_address(
|
|
str(context['_request']['remote_addr']))
|
|
network_list = config.get('session', 'authentication_ip_network')
|
|
if network_list:
|
|
for network in network_list.split(','):
|
|
ip_network = ipaddress.ip_network(network)
|
|
if ip_address in ip_network:
|
|
return True
|
|
return False
|
|
|
|
@classmethod
|
|
def _check_login_options_device_cookie(cls, login, parameters):
|
|
return bool(parameters.get('device_cookie'))
|
|
|
|
@classmethod
|
|
def _login_password(cls, login, parameters):
|
|
if 'password' not in parameters:
|
|
msg = gettext('res.msg_user_password', login=login)
|
|
raise LoginException('password', msg, type='password')
|
|
user_id, password_hash, password_reset = cls._get_login(login)
|
|
if user_id and password_hash:
|
|
password = parameters['password']
|
|
valid, new_hash = cls.check_password(password, password_hash)
|
|
if valid:
|
|
if new_hash:
|
|
logger.info("Update password hash for %s", user_id)
|
|
with Transaction().new_transaction():
|
|
with without_check_access():
|
|
cls.write([cls(user_id)], {
|
|
'password_hash': new_hash,
|
|
})
|
|
return user_id
|
|
if user_id and password_reset:
|
|
if password_reset == parameters['password']:
|
|
return user_id
|
|
|
|
@classmethod
|
|
def hash_password(cls, password):
|
|
'''Hash given password in the form
|
|
<hash_method>$<password>$<salt>...'''
|
|
if not password:
|
|
return None
|
|
return CRYPT_CONTEXT.hash(password)
|
|
|
|
@classmethod
|
|
def check_password(cls, password, hash_):
|
|
if not hash_:
|
|
return False
|
|
try:
|
|
return CRYPT_CONTEXT.verify_and_update(password, hash_)
|
|
except ValueError:
|
|
hash_method = hash_.split('$', 1)[0]
|
|
warnings.warn(
|
|
"Use deprecated hash method %s" % hash_method,
|
|
DeprecationWarning)
|
|
valid = getattr(cls, 'check_' + hash_method)(password, hash_)
|
|
if valid:
|
|
new_hash = CRYPT_CONTEXT.hash(password)
|
|
else:
|
|
new_hash = None
|
|
return valid, new_hash
|
|
|
|
@classmethod
|
|
def hash_sha1(cls, password):
|
|
salt = gen_password()
|
|
salted_password = password + salt
|
|
if isinstance(salted_password, str):
|
|
salted_password = salted_password.encode('utf-8')
|
|
hash_ = hashlib.sha1(salted_password).hexdigest()
|
|
return '$'.join(['sha1', hash_, salt])
|
|
|
|
@classmethod
|
|
def check_sha1(cls, password, hash_):
|
|
if isinstance(password, str):
|
|
password = password.encode('utf-8')
|
|
hash_method, hash_, salt = hash_.split('$', 2)
|
|
salt = salt or ''
|
|
if isinstance(salt, str):
|
|
salt = salt.encode('utf-8')
|
|
assert hash_method == 'sha1'
|
|
return hash_ == hashlib.sha1(password + salt).hexdigest()
|
|
|
|
@classmethod
|
|
def hash_bcrypt(cls, password):
|
|
if isinstance(password, str):
|
|
password = password.encode('utf-8')
|
|
hash_ = bcrypt.hashpw(password, bcrypt.gensalt()).decode('utf-8')
|
|
return '$'.join(['bcrypt', hash_])
|
|
|
|
@classmethod
|
|
def check_bcrypt(cls, password, hash_):
|
|
if isinstance(password, str):
|
|
password = password.encode('utf-8')
|
|
hash_method, hash_ = hash_.split('$', 1)
|
|
if isinstance(hash_, str):
|
|
hash_ = hash_.encode('utf-8')
|
|
assert hash_method == 'bcrypt'
|
|
return hash_ == bcrypt.hashpw(password, hash_)
|
|
|
|
|
|
class LoginAttempt(ModelSQL):
|
|
"""Login Attempt
|
|
|
|
This class is separated from the res.user one in order to prevent locking
|
|
the res.user table when in a long running process.
|
|
"""
|
|
__name__ = 'res.user.login.attempt'
|
|
login = fields.Char('Login', size=512)
|
|
device_cookie = fields.Char("Device Cookie", strip=False)
|
|
ip_address = fields.Char("IP Address")
|
|
ip_network = fields.Char("IP Network")
|
|
|
|
@staticmethod
|
|
def delay():
|
|
return (datetime.datetime.now()
|
|
- datetime.timedelta(seconds=config.getint('session', 'timeout')))
|
|
|
|
@classmethod
|
|
def ipaddress(cls):
|
|
context = Transaction().context
|
|
ip_address = ''
|
|
ip_network = ''
|
|
if context.get('_request') and context['_request'].get('remote_addr'):
|
|
ip_address = ipaddress.ip_address(
|
|
str(context['_request']['remote_addr']))
|
|
prefix = config.getint(
|
|
'session', 'ip_network_%s' % ip_address.version)
|
|
ip_network = ipaddress.ip_network(
|
|
str(context['_request']['remote_addr']))
|
|
ip_network = ip_network.supernet(new_prefix=prefix)
|
|
return ip_address, ip_network
|
|
|
|
def _login_size(func):
|
|
@wraps(func)
|
|
def wrapper(cls, login, *args, **kwargs):
|
|
return func(cls, login[:cls.login.size], *args, **kwargs)
|
|
return wrapper
|
|
|
|
@classmethod
|
|
@_login_size
|
|
def add(cls, login, device_cookie=None):
|
|
cursor = Transaction().connection.cursor()
|
|
table = cls.__table__()
|
|
cursor.execute(*table.delete(where=table.create_date < cls.delay()))
|
|
|
|
ip_address, ip_network = cls.ipaddress()
|
|
cls.create([{
|
|
'login': login,
|
|
'device_cookie': device_cookie,
|
|
'ip_address': str(ip_address),
|
|
'ip_network': str(ip_network),
|
|
}])
|
|
|
|
@classmethod
|
|
@_login_size
|
|
def remove(cls, login, device_cookie=None):
|
|
cursor = Transaction().connection.cursor()
|
|
table = cls.__table__()
|
|
cursor.execute(*table.delete(
|
|
where=(table.login == login)
|
|
& (table.device_cookie == device_cookie)
|
|
))
|
|
|
|
@classmethod
|
|
@_login_size
|
|
def count(cls, login, device_cookie=None):
|
|
cursor = Transaction().connection.cursor()
|
|
table = cls.__table__()
|
|
cursor.execute(*table.select(Count(Literal('*')),
|
|
where=(table.login == login)
|
|
& (table.device_cookie == device_cookie)
|
|
& (table.create_date >= cls.delay())))
|
|
return cursor.fetchone()[0]
|
|
|
|
@classmethod
|
|
def count_ip(cls):
|
|
cursor = Transaction().connection.cursor()
|
|
table = cls.__table__()
|
|
_, ip_network = cls.ipaddress()
|
|
cursor.execute(*table.select(Count(Literal('*')),
|
|
where=(table.ip_network == str(ip_network))
|
|
& (table.create_date >= cls.delay())))
|
|
return cursor.fetchone()[0]
|
|
|
|
del _login_size
|
|
|
|
|
|
class UserDevice(ModelSQL):
|
|
"User Device"
|
|
__name__ = 'res.user.device'
|
|
|
|
login = fields.Char("Login", required=True)
|
|
cookie = fields.Char("Cookie", readonly=True, required=True, strip=False)
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super().__setup__()
|
|
cls.__rpc__.update({
|
|
'renew': RPC(readonly=False),
|
|
})
|
|
|
|
@classmethod
|
|
def get_valid_cookie(cls, login, cookie):
|
|
try:
|
|
device, = cls.search([
|
|
('login', '=', login),
|
|
('cookie', '=', cookie),
|
|
], limit=1)
|
|
except ValueError:
|
|
return None
|
|
|
|
return device.cookie
|
|
|
|
@classmethod
|
|
def renew(cls, current_cookie):
|
|
pool = Pool()
|
|
User = pool.get('res.user')
|
|
|
|
user = User(Transaction().user)
|
|
new_cookie = uuid.uuid4().hex
|
|
current_devices = cls.search([
|
|
('login', '=', user.login),
|
|
('cookie', '=', current_cookie),
|
|
])
|
|
if current_devices:
|
|
cls.write(current_devices, {
|
|
'cookie': new_cookie
|
|
})
|
|
else:
|
|
cls.create([{
|
|
'login': user.login,
|
|
'cookie': new_cookie,
|
|
}])
|
|
return new_cookie
|
|
|
|
@classmethod
|
|
def clear(cls, logins):
|
|
for sub_logins in grouped_slice(logins):
|
|
cls.delete(cls.search([
|
|
('login', 'in', list(sub_logins)),
|
|
]))
|
|
|
|
|
|
class UserAction(ModelSQL):
|
|
'User - Action'
|
|
__name__ = 'res.user-ir.action'
|
|
user = fields.Many2One('res.user', 'User', ondelete='CASCADE',
|
|
required=True)
|
|
action = fields.Many2One('ir.action', 'Action', ondelete='CASCADE',
|
|
required=True)
|
|
|
|
@staticmethod
|
|
def _convert_values(values):
|
|
pool = Pool()
|
|
Action = pool.get('ir.action')
|
|
values = values.copy()
|
|
if values.get('action'):
|
|
values['action'] = Action.get_action_id(values['action'])
|
|
return values
|
|
|
|
@classmethod
|
|
def create(cls, vlist):
|
|
vlist = [cls._convert_values(values) for values in vlist]
|
|
return super(UserAction, cls).create(vlist)
|
|
|
|
@classmethod
|
|
def write(cls, records, values, *args):
|
|
actions = iter((records, values) + args)
|
|
args = []
|
|
for records, values in zip(actions, actions):
|
|
args.extend((records, cls._convert_values(values)))
|
|
super(UserAction, cls).write(*args)
|
|
|
|
|
|
class UserGroup(ModelSQL):
|
|
'User - Group'
|
|
__name__ = 'res.user-res.group'
|
|
user = fields.Many2One('res.user', 'User', ondelete='CASCADE',
|
|
required=True)
|
|
group = fields.Many2One('res.group', 'Group', ondelete='CASCADE',
|
|
required=True)
|
|
|
|
@classmethod
|
|
def create(cls, vlist):
|
|
records = super().create(vlist)
|
|
pool = Pool()
|
|
# Restart the cache for get_groups
|
|
pool.get('res.user')._get_groups_cache.clear()
|
|
return records
|
|
|
|
@classmethod
|
|
def write(cls, groups, values, *args):
|
|
super().write(groups, values, *args)
|
|
pool = Pool()
|
|
# Restart the cache for get_groups
|
|
pool.get('res.user')._get_groups_cache.clear()
|
|
|
|
@classmethod
|
|
def delete(cls, groups):
|
|
super().delete(groups)
|
|
pool = Pool()
|
|
# Restart the cache for get_groups
|
|
pool.get('res.user')._get_groups_cache.clear()
|
|
|
|
@classmethod
|
|
def user_group_all_table(cls):
|
|
pool = Pool()
|
|
Group = pool.get('res.group')
|
|
user_group = cls.__table__()
|
|
group_parents = Group.group_parent_all_cte()
|
|
|
|
return (user_group
|
|
.join(group_parents,
|
|
condition=user_group.group == group_parents.id)
|
|
.select(
|
|
user_group.user.as_('user'),
|
|
group_parents.parent.as_('group'),
|
|
where=group_parents.parent != Null,
|
|
with_=group_parents))
|
|
|
|
|
|
class Warning_(ModelSQL, ModelView):
|
|
'User Warning'
|
|
__name__ = 'res.user.warning'
|
|
|
|
user = fields.Many2One('res.user', 'User', required=True)
|
|
name = fields.Char('Name', required=True)
|
|
always = fields.Boolean('Always')
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super().__setup__()
|
|
cls.__access__.add('user')
|
|
cls.__rpc__.update(
|
|
skip=RPC(check_access=False, readonly=False),
|
|
)
|
|
|
|
@classmethod
|
|
def skip(cls, name, always=False):
|
|
cls.create([{
|
|
'name': name,
|
|
'user': Transaction().user,
|
|
'always': always,
|
|
}])
|
|
|
|
@classmethod
|
|
def format(cls, name, records):
|
|
key = '|'.join(map(str, records)).encode('utf-8')
|
|
return '%s.%s' % (hashlib.md5(key).hexdigest(), name)
|
|
|
|
@classmethod
|
|
@without_check_access
|
|
def check(cls, warning_name):
|
|
transaction = Transaction()
|
|
user = transaction.user
|
|
context = transaction.context
|
|
if not user or context.get('_skip_warnings'):
|
|
return False
|
|
key = (user, warning_name)
|
|
if key in transaction.check_warnings:
|
|
return False
|
|
warnings = cls.search([
|
|
('user', '=', user),
|
|
('name', '=', warning_name),
|
|
])
|
|
if not warnings:
|
|
return True
|
|
transaction.check_warnings.add(key)
|
|
cls.delete([x for x in warnings if not x.always])
|
|
return False
|
|
|
|
|
|
class UserApplication(Workflow, ModelSQL, ModelView):
|
|
"User Application"
|
|
__name__ = 'res.user.application'
|
|
_rec_name = 'key'
|
|
|
|
key = fields.Char("Key", required=True, strip=False)
|
|
user = fields.Many2One('res.user', "User")
|
|
application = fields.Selection([], "Application", required=True)
|
|
state = fields.Selection([
|
|
('requested', "Requested"),
|
|
('validated', "Validated"),
|
|
('cancelled', "Cancelled"),
|
|
], "State", readonly=True, sort=False)
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super(UserApplication, cls).__setup__()
|
|
cls._transitions |= set((
|
|
('requested', 'validated'),
|
|
('requested', 'cancelled'),
|
|
('validated', 'cancelled'),
|
|
))
|
|
cls._buttons.update({
|
|
'validate_': {
|
|
'invisible': Eval('state') != 'requested',
|
|
'depends': ['state'],
|
|
},
|
|
'cancel': {
|
|
'invisible': Eval('state') == 'cancelled',
|
|
'depends': ['state'],
|
|
},
|
|
})
|
|
# Do not cache default_key as it depends on time
|
|
cls.__rpc__['default_get'].cache = None
|
|
|
|
table = cls.__table__()
|
|
cls._sql_indexes.update({
|
|
Index(table, (table.key, Index.Equality())),
|
|
Index(
|
|
table,
|
|
(table.user, Index.Equality()),
|
|
(table.state, Index.Equality())),
|
|
Index(
|
|
table,
|
|
(table.key, Index.Equality()),
|
|
(table.application, Index.Equality()),
|
|
(table.state, Index.Equality())),
|
|
})
|
|
|
|
@classmethod
|
|
def default_key(cls):
|
|
return ''.join(uuid.uuid4().hex for _ in range(4))
|
|
|
|
@classmethod
|
|
def default_state(cls):
|
|
return 'validated'
|
|
|
|
@classmethod
|
|
@ModelView.button
|
|
@Workflow.transition('validated')
|
|
def validate_(cls, applications):
|
|
pass
|
|
|
|
@classmethod
|
|
@ModelView.button
|
|
@Workflow.transition('cancelled')
|
|
def cancel(cls, applications):
|
|
pass
|
|
|
|
@classmethod
|
|
def count(cls, user_id):
|
|
return cls.search([
|
|
('user', '=', user_id),
|
|
('state', '=', 'requested'),
|
|
], count=True)
|
|
|
|
@classmethod
|
|
def check(cls, key, application):
|
|
records = cls.search([
|
|
('key', '=', key),
|
|
('application', '=', application),
|
|
('state', '=', 'validated'),
|
|
], limit=1)
|
|
if not records:
|
|
return
|
|
record, = records
|
|
return record
|
|
|
|
@classmethod
|
|
def create(cls, vlist):
|
|
vlist = [v.copy() for v in vlist]
|
|
for values in vlist:
|
|
# Ensure we get a different key for each record
|
|
# default methods are called only once
|
|
values.setdefault('key', cls.default_key())
|
|
applications = super(UserApplication, cls).create(vlist)
|
|
return applications
|
|
|
|
|
|
class EmailResetPassword(Report):
|
|
__name__ = 'res.user.email_reset_password'
|
|
|
|
@classmethod
|
|
def get_context(cls, records, header, data):
|
|
context = super().get_context(records, header, data)
|
|
context['host'] = host()
|
|
context['http_host'] = http_host()
|
|
context['database'] = Transaction().database.name
|
|
expire_delay = (
|
|
records[0].password_reset_expire - datetime.datetime.now())
|
|
# Use a precision of minutes
|
|
expire_delay = datetime.timedelta(
|
|
days=expire_delay.days,
|
|
minutes=round(expire_delay.seconds / 60))
|
|
context['expire_delay'] = expire_delay
|
|
return context
|
|
|
|
|
|
class UserConfigStart(ModelView):
|
|
'User Config Init'
|
|
__name__ = 'res.user.config.start'
|
|
|
|
|
|
class UserConfig(Wizard):
|
|
'Configure users'
|
|
__name__ = 'res.user.config'
|
|
|
|
start = StateView('res.user.config.start',
|
|
'res.user_config_start_view_form', [
|
|
Button('Cancel', 'end', 'tryton-cancel'),
|
|
Button('OK', 'user', 'tryton-ok', default=True),
|
|
])
|
|
user = StateView('res.user',
|
|
'res.user_view_form', [
|
|
Button('End', 'end', 'tryton-cancel'),
|
|
Button('Add', 'add', 'tryton-ok'),
|
|
])
|
|
add = StateTransition()
|
|
|
|
def transition_add(self):
|
|
pool = Pool()
|
|
User = pool.get('res.user')
|
|
self.user.save()
|
|
self.user = User()
|
|
return 'user'
|