Files
tradon/ir/session.py
2025-12-26 13:11:43 +00:00

175 lines
5.4 KiB
Python
Executable File

# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
import datetime
import json
from secrets import token_hex
from trytond.cache import Cache
from trytond.config import config
from trytond.model import Index, ModelSQL, fields
_session_timeout = datetime.timedelta(
seconds=config.getint('session', 'timeout'))
_reset_interval = _session_timeout // 10
class Session(ModelSQL):
"Session"
__name__ = 'ir.session'
_rec_name = 'key'
key = fields.Char("Key", required=True, strip=False)
_session_reset_cache = Cache('ir_session.session_reset', context=False)
@classmethod
def __setup__(cls):
super(Session, cls).__setup__()
table = cls.__table__()
cls.__rpc__ = {}
cls._sql_indexes.update({
Index(table,
(table.key, Index.Equality()),
(table.create_uid, Index.Equality())),
Index(table,
(table.key, Index.Equality()),
(table.create_date, Index.Equality())),
Index(table,
(table.key, Index.Equality()),
(table.write_date, Index.Equality())),
})
@classmethod
def default_key(cls, nbytes=None):
return token_hex(nbytes)
@classmethod
def write(cls, *args):
super().write(*args)
for sessions in args[0:None:2]:
for session in sessions:
cls._session_reset_cache.set(session.key, session.write_date)
@classmethod
def new(cls, values=None):
"Create a new session for the transaction user and return the key."
if values is None:
values = {}
session, = cls.create([values])
return session.key
@classmethod
def remove(cls, key, domain=None):
"Delete the key session and return the login."
domain = [
('key', '=', key),
domain or [],
]
sessions = cls.search(domain)
if not sessions:
return
session, = sessions
name = session.create_uid.login
cls.delete(sessions)
return name
@classmethod
def check(cls, user, key, domain=None):
"""
Check user key against max_age and delete old one.
Return True if key is still valid, False if the key is expired and None
if the key does not exist.
"""
now = datetime.datetime.now()
timeout = datetime.timedelta(
seconds=config.getint('session', 'max_age'))
sessions = cls.search([
('create_uid', '=', user),
domain or [],
])
find, last_reset = None, None
to_delete = []
for session in sessions:
if abs(session.create_date - now) < timeout:
if session.key == key:
find = True
last_reset = session.write_date or session.create_date
else:
if find is None and session.key == key:
find = False
to_delete.append(session)
cls.delete(to_delete)
if find:
cls._session_reset_cache.set(key, last_reset)
return find
@classmethod
def check_timeout(cls, user, key, domain=None):
"""
Check user key against timeout.
Return True if key is still valid otherwise the key is deleted.
"""
now = datetime.datetime.now()
timeout = datetime.timedelta(
seconds=config.getint('session', 'timeout'))
session, = cls.search([
('create_uid', '=', user),
('key', '=', key),
domain or [],
], limit=1)
timestamp = session.write_date or session.create_date
valid = abs(timestamp - now) < timeout
if not valid:
cls.delete([session])
return valid
@classmethod
def reset(cls, key, domain=None):
"Reset key session timestamp"
now = datetime.datetime.now()
last_reset = cls._session_reset_cache.get(key)
if last_reset is None or (now - _reset_interval) > last_reset:
timestamp = now - _session_timeout
sessions = cls.search([
('key', '=', key),
['OR',
('create_date', '>=', timestamp),
('write_date', '>=', timestamp),
],
domain or [],
])
cls.write(sessions, {})
@classmethod
def clear(cls, users, domain=None):
"Clear all sessions for users"
sessions = cls.search([
('create_uid', 'in', users),
domain or [],
])
cls.delete(sessions)
@classmethod
def create(cls, vlist):
vlist = [v.copy() for v in vlist]
for values in vlist:
# Ensure to get a different key for each record
# default methods are called only once
values.setdefault('key', cls.default_key())
return super(Session, cls).create(vlist)
class SessionWizard(ModelSQL):
"Session Wizard"
__name__ = 'ir.session.wizard'
data = fields.Text('Data')
@classmethod
def __setup__(cls):
super(SessionWizard, cls).__setup__()
cls.__rpc__ = {}
@staticmethod
def default_data():
return json.dumps({})