Files
tradon/modules/web_user/user.py
2025-12-26 13:11:43 +00:00

559 lines
18 KiB
Python
Executable File

# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
import datetime
import hashlib
import logging
import random
import string
import time
import urllib.parse
import warnings
from secrets import token_hex
try:
import bcrypt
except ImportError:
bcrypt = None
try:
import html2text
except ImportError:
html2text = None
from sql import Literal
from sql.conditionals import Coalesce
from sql.functions import CurrentTimestamp
from sql.operators import Equal
from trytond.config import config
from trytond.exceptions import RateLimitException
from trytond.i18n import gettext
from trytond.model import (
DeactivableMixin, Exclude, Index, ModelSQL, ModelView, Unique,
avatar_mixin, fields)
from trytond.pool import Pool
from trytond.pyson import Eval
from trytond.report import Report, get_email
from trytond.res.user import CRYPT_CONTEXT, LoginAttempt
from trytond.sendmail import send_message_transactional
from trytond.tools.email_ import (
EmailNotValidError, normalize_email, set_from_header, validate_email)
from trytond.transaction import Transaction, without_check_access
from .exceptions import UserValidationError
logger = logging.getLogger(__name__)
def _send_email(from_, users, email_func):
from_cfg = config.get('email', 'from')
for user in users:
msg, title = email_func(user)
set_from_header(msg, from_cfg, from_ or from_cfg)
msg['To'] = user.email
msg['Subject'] = title
send_message_transactional(msg)
def _add_params(url, **params):
parts = urllib.parse.urlsplit(url)
query = urllib.parse.parse_qsl(parts.query)
for key, value in sorted(params.items()):
query.append((key, value))
parts = list(parts)
parts[3] = urllib.parse.urlencode(query)
return urllib.parse.urlunsplit(parts)
def _extract_params(url):
return urllib.parse.parse_qsl(urllib.parse.urlsplit(url).query)
class User(avatar_mixin(100), DeactivableMixin, ModelSQL, ModelView):
'Web User'
__name__ = 'web.user'
_rec_name = 'email'
email = fields.Char(
"E-mail",
states={
'required': Eval('active', True),
})
email_valid = fields.Boolean('E-mail Valid')
email_token = fields.Char("E-mail Token", strip=False)
password_hash = fields.Char('Password Hash')
password = fields.Function(
fields.Char('Password'), 'get_password', setter='set_password')
reset_password_token = fields.Char("Reset Password Token", strip=False)
reset_password_token_expire = fields.Timestamp(
'Reset Password Token Expire')
party = fields.Many2One('party.party', 'Party', ondelete='RESTRICT')
secondary_parties = fields.Many2Many(
'web.user-party.party.secondary', 'user', 'party', "Secondary Parties")
@classmethod
def __setup__(cls):
super(User, cls).__setup__()
table = cls.__table__()
cls._sql_constraints += [
('email_exclude',
Exclude(table, (table.email, Equal),
where=table.active == Literal(True)),
'web_user.msg_user_email_unique'),
]
cls._sql_indexes.update({
Index(table, (table.email, Index.Equality())),
Index(table, (table.email_token, Index.Equality())),
})
cls._buttons.update({
'validate_email': {
'readonly': Eval('email_valid', False),
'depends': ['email_valid'],
},
'reset_password': {
'readonly': ~Eval('email_valid', False),
'depends': ['email_valid'],
},
})
@classmethod
def default_email_valid(cls):
return False
def get_password(self, name):
return 'x' * 10
@classmethod
def set_password(cls, users, name, value):
pool = Pool()
User = pool.get('res.user')
if value == 'x' * 10:
return
if Transaction().user and value:
User.validate_password(value, users)
to_write = []
for user in users:
to_write.extend([[user], {
'password_hash': cls.hash_password(value),
}])
cls.write(*to_write)
@fields.depends('party', 'email')
def on_change_party(self):
if not self.email and self.party:
self.email = self.party.email
@classmethod
def _format_email(cls, users):
for user in users:
email = normalize_email(user.email).lower()
if email != user.email:
user.email = email
cls.save(users)
@classmethod
def copy(cls, users, default=None):
default = default.copy() if default is not None else {}
default['password_hash'] = None
default['reset_password_token'] = None
return super().copy(users, default=default)
@classmethod
def create(cls, vlist):
users = super(User, cls).create(vlist)
cls._format_email(users)
return users
@classmethod
def write(cls, *args):
super(User, cls).write(*args)
users = sum(args[0:None:2], [])
cls._format_email(users)
@classmethod
def validate_fields(cls, users, fields_names):
super().validate_fields(users, fields_names)
cls.check_valid_email(users, fields_names)
@classmethod
def check_valid_email(cls, users, fields_names=None):
if fields_names and 'email' not in fields_names:
return
for user in users:
if user.email:
try:
validate_email(user.email)
except EmailNotValidError as e:
raise UserValidationError(gettext(
'web_user.msg_user_email_invalid',
user=user.rec_name,
email=user.email),
str(e)) from e
@classmethod
def authenticate(cls, email, password):
pool = Pool()
Attempt = pool.get('web.user.authenticate.attempt')
email = email.lower()
count_ip = Attempt.count_ip()
if count_ip > config.getint(
'session', 'max_attempt_ip_network', default=300):
# Do not add attempt as the goal is to prevent flooding
raise RateLimitException()
count = Attempt.count(email)
if count > config.getint('session', 'max_attempt', default=5):
Attempt.add(email)
raise RateLimitException()
# Prevent brute force attack
Transaction().atexit(time.sleep, 2 ** count - 1)
users = cls.search([('email', '=', email)])
if users:
user, = users
valid, new_hash = cls.check_password(password, user.password_hash)
if valid:
if new_hash:
logger.info("Update password hash for %s", user.id)
with Transaction().new_transaction():
with without_check_access():
cls.write([cls(user.id)], {
'password_hash': new_hash,
})
Attempt.remove(email)
return user
Attempt.add(email)
@classmethod
def hash_password(cls, password):
'''Hash given password in the form
<hash_method>$<password>$<salt>...'''
if not password:
return ''
return CRYPT_CONTEXT.hash(password)
@classmethod
def check_password(cls, password, hash_):
if not hash_:
return False, None
try:
return CRYPT_CONTEXT.verify_and_update(password, hash_)
except ValueError:
hash_method = hash_.split('$', 1)[0]
warnings.warn(
"Use deprecated hash method %s" % hash_method,
DeprecationWarning)
valid = getattr(cls, 'check_' + hash_method)(password, hash_)
if valid:
new_hash = CRYPT_CONTEXT.hash(password)
else:
new_hash = None
return valid, new_hash
@classmethod
def hash_sha1(cls, password):
salt = ''.join(random.sample(string.ascii_letters + string.digits, 8))
salted_password = password + salt
if isinstance(salted_password, str):
salted_password = salted_password.encode('utf-8')
hash_ = hashlib.sha1(salted_password).hexdigest()
return '$'.join(['sha1', hash_, salt])
@classmethod
def check_sha1(cls, password, hash_):
if isinstance(password, str):
password = password.encode('utf-8')
hash_method, hash_, salt = hash_.split('$', 2)
salt = salt or ''
if isinstance(salt, str):
salt = salt.encode('utf-8')
assert hash_method == 'sha1'
return hash_ == hashlib.sha1(password + salt).hexdigest()
@classmethod
def hash_bcrypt(cls, password):
if isinstance(password, str):
password = password.encode('utf-8')
hash_ = bcrypt.hashpw(password, bcrypt.gensalt()).decode('utf-8')
return '$'.join(['bcrypt', hash_])
@classmethod
def check_bcrypt(cls, password, hash_):
if isinstance(password, str):
password = password.encode('utf-8')
hash_method, hash_ = hash_.split('$', 1)
if isinstance(hash_, str):
hash_ = hash_.encode('utf-8')
assert hash_method == 'bcrypt'
return hash_ == bcrypt.hashpw(password, hash_)
def new_session(self):
pool = Pool()
Session = pool.get('web.user.session')
return Session.add(self)
@classmethod
def get_user(cls, session):
pool = Pool()
Session = pool.get('web.user.session')
return Session.get_user(session)
@classmethod
@ModelView.button
def validate_email(cls, users, from_=None):
for user in users:
user.set_email_token()
cls.save(users)
_send_email(from_, users, cls.get_email_validation)
def set_email_token(self, nbytes=None):
self.email_token = token_hex(nbytes)
def get_email_validation(self):
return get_email(
'web.user.email_validation', self, self.languages)
def get_email_validation_url(self, url=None):
if url is None:
url = config.get('web', 'email_validation_url')
return _add_params(url, token=self.email_token)
@classmethod
def validate_email_url(cls, url):
parts = urllib.parse.urlsplit(url)
tokens = filter(
None, urllib.parse.parse_qs(parts.query).get('token', [None]))
return cls.validate_email_token(list(tokens))
@classmethod
def validate_email_token(cls, tokens):
users = cls.search([
('email_token', 'in', tokens),
])
cls.write(users, {
'email_valid': True,
'email_token': None,
})
return users
@classmethod
@ModelView.button
def reset_password(cls, users, from_=None):
now = datetime.datetime.now()
# Prevent abusive reset
def reset(user):
return not (user.reset_password_token_expire
and user.reset_password_token_expire > now)
users = list(filter(reset, users))
for user in users:
user.set_reset_password_token()
cls.save(users)
_send_email(from_, users, cls.get_email_reset_password)
def set_reset_password_token(self, nbytes=None):
self.reset_password_token = token_hex(nbytes)
self.reset_password_token_expire = (
datetime.datetime.now() + datetime.timedelta(
seconds=config.getint(
'session', 'web_timeout_reset', default=24 * 60 * 60)))
def clear_reset_password_token(self):
self.reset_password_token = None
self.reset_password_token_expire = None
def get_email_reset_password(self):
return get_email(
'web.user.email_reset_password', self, self.languages)
def get_email_reset_password_url(self, url=None):
if url is None:
url = config.get('web', 'reset_password_url')
return _add_params(
url, token=self.reset_password_token, email=self.email)
@classmethod
def set_password_url(cls, url, password):
parts = urllib.parse.urlsplit(url)
query = urllib.parse.parse_qs(parts.query)
email = query.get('email', [None])[0]
token = query.get('token', [None])[0]
return cls.set_password_token(email, token, password)
@classmethod
def set_password_token(cls, email, token, password):
pool = Pool()
Attempt = pool.get('web.user.authenticate.attempt')
email = email.lower()
# Prevent brute force attack
Transaction().atexit(
time.sleep, random.randint(0, 2 ** Attempt.count(email) - 1))
users = cls.search([
('email', '=', email),
])
if users:
user, = users
if user.reset_password_token == token:
now = datetime.datetime.now()
expire = user.reset_password_token_expire
user.clear_reset_password_token()
if expire > now:
user.password = password
user.save()
Attempt.remove(email)
return True
Attempt.add(email)
return False
@property
def languages(self):
pool = Pool()
Language = pool.get('ir.lang')
if self.party and self.party.lang:
languages = [self.party.lang]
else:
languages = Language.search([
('code', '=', Transaction().language),
])
return languages
class User_PartySecondary(ModelSQL):
"Web User - Secondary Party"
__name__ = 'web.user-party.party.secondary'
user = fields.Many2One(
'web.user', "User", required=True, ondelete='CASCADE')
party = fields.Many2One(
'party.party', "Party", required=True, ondelete='CASCADE')
class UserAuthenticateAttempt(LoginAttempt):
'Web User Authenticate Attempt'
__name__ = 'web.user.authenticate.attempt'
_table = None # Needed to reset LoginAttempt._table
class UserSession(ModelSQL):
'Web User Session'
__name__ = 'web.user.session'
_rec_name = 'key'
key = fields.Char("Key", required=True, strip=False)
user = fields.Many2One(
'web.user', "User", required=True, ondelete='CASCADE')
@classmethod
def __setup__(cls):
super(UserSession, cls).__setup__()
table = cls.__table__()
cls.__rpc__ = {}
cls._sql_constraints += [
('key_unique', Unique(table, table.key),
'web_user.msg_user_session_key_unique'),
]
cls._sql_indexes.update({
Index(
table,
(Coalesce(table.write_date, table.create_date),
Index.Range())),
Index(table, (table.key, Index.Equality())),
})
@classmethod
def default_key(cls, nbytes=None):
return token_hex(nbytes)
@classmethod
def add(cls, user):
cursor = Transaction().connection.cursor()
table = cls.__table__()
cursor.execute(*table.delete(
where=(
Coalesce(table.write_date, table.create_date)
< CurrentTimestamp() - cls.timeout())))
session = cls(user=user)
session.save()
return session.key
@classmethod
def remove(cls, key):
sessions = cls.search([
('key', '=', key),
])
cls.delete(sessions)
@classmethod
def get_user(cls, session):
transaction = Transaction()
sessions = cls.search([
('key', '=', session),
])
if not sessions:
return
session, = sessions
if not session.expired:
return session.user
elif not transaction.readonly:
cls.delete([session])
@classmethod
def timeout(cls):
return datetime.timedelta(seconds=config.getint(
'session', 'web_timeout', default=30 * 24 * 60 * 60))
@property
def expired(self):
now = datetime.datetime.now()
timestamp = self.write_date or self.create_date
return abs(timestamp - now) > self.timeout()
@classmethod
def reset(cls, session):
sessions = cls.search([
('key', '=', session),
])
cls.write(sessions, {})
@classmethod
def create(cls, vlist):
vlist = [v.copy() for v in vlist]
for values in vlist:
# Ensure to get a different key for each record
# default methods are called only once
values.setdefault('key', cls.default_key())
return super(UserSession, cls).create(vlist)
class EmailValidation(Report):
__name__ = 'web.user.email_validation'
@classmethod
def get_context(cls, records, header, data):
context = super().get_context(records, header, data)
context['extract_params'] = _extract_params
return context
class EmailResetPassword(Report):
__name__ = 'web.user.email_reset_password'
@classmethod
def get_context(cls, records, header, data):
context = super().get_context(records, header, data)
context['extract_params'] = _extract_params
expire_delay = (
records[0].reset_password_token_expire - datetime.datetime.now())
# Use a precision of minutes
expire_delay = datetime.timedelta(
days=expire_delay.days,
minutes=round(expire_delay.seconds / 60))
context['expire_delay'] = expire_delay
return context