175 lines
5.4 KiB
Python
Executable File
175 lines
5.4 KiB
Python
Executable File
# This file is part of Tryton. The COPYRIGHT file at the top level of
|
|
# this repository contains the full copyright notices and license terms.
|
|
import datetime
|
|
import json
|
|
from secrets import token_hex
|
|
|
|
from trytond.cache import Cache
|
|
from trytond.config import config
|
|
from trytond.model import Index, ModelSQL, fields
|
|
|
|
_session_timeout = datetime.timedelta(
|
|
seconds=config.getint('session', 'timeout'))
|
|
_reset_interval = _session_timeout // 10
|
|
|
|
|
|
class Session(ModelSQL):
|
|
"Session"
|
|
__name__ = 'ir.session'
|
|
_rec_name = 'key'
|
|
|
|
key = fields.Char("Key", required=True, strip=False)
|
|
_session_reset_cache = Cache('ir_session.session_reset', context=False)
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super(Session, cls).__setup__()
|
|
table = cls.__table__()
|
|
cls.__rpc__ = {}
|
|
cls._sql_indexes.update({
|
|
Index(table,
|
|
(table.key, Index.Equality()),
|
|
(table.create_uid, Index.Equality())),
|
|
Index(table,
|
|
(table.key, Index.Equality()),
|
|
(table.create_date, Index.Equality())),
|
|
Index(table,
|
|
(table.key, Index.Equality()),
|
|
(table.write_date, Index.Equality())),
|
|
})
|
|
|
|
@classmethod
|
|
def default_key(cls, nbytes=None):
|
|
return token_hex(nbytes)
|
|
|
|
@classmethod
|
|
def write(cls, *args):
|
|
super().write(*args)
|
|
for sessions in args[0:None:2]:
|
|
for session in sessions:
|
|
cls._session_reset_cache.set(session.key, session.write_date)
|
|
|
|
@classmethod
|
|
def new(cls, values=None):
|
|
"Create a new session for the transaction user and return the key."
|
|
if values is None:
|
|
values = {}
|
|
session, = cls.create([values])
|
|
return session.key
|
|
|
|
@classmethod
|
|
def remove(cls, key, domain=None):
|
|
"Delete the key session and return the login."
|
|
domain = [
|
|
('key', '=', key),
|
|
domain or [],
|
|
]
|
|
sessions = cls.search(domain)
|
|
if not sessions:
|
|
return
|
|
session, = sessions
|
|
name = session.create_uid.login
|
|
cls.delete(sessions)
|
|
return name
|
|
|
|
@classmethod
|
|
def check(cls, user, key, domain=None):
|
|
"""
|
|
Check user key against max_age and delete old one.
|
|
Return True if key is still valid, False if the key is expired and None
|
|
if the key does not exist.
|
|
"""
|
|
now = datetime.datetime.now()
|
|
timeout = datetime.timedelta(
|
|
seconds=config.getint('session', 'max_age'))
|
|
sessions = cls.search([
|
|
('create_uid', '=', user),
|
|
domain or [],
|
|
])
|
|
find, last_reset = None, None
|
|
to_delete = []
|
|
for session in sessions:
|
|
if abs(session.create_date - now) < timeout:
|
|
if session.key == key:
|
|
find = True
|
|
last_reset = session.write_date or session.create_date
|
|
else:
|
|
if find is None and session.key == key:
|
|
find = False
|
|
to_delete.append(session)
|
|
cls.delete(to_delete)
|
|
if find:
|
|
cls._session_reset_cache.set(key, last_reset)
|
|
return find
|
|
|
|
@classmethod
|
|
def check_timeout(cls, user, key, domain=None):
|
|
"""
|
|
Check user key against timeout.
|
|
Return True if key is still valid otherwise the key is deleted.
|
|
"""
|
|
now = datetime.datetime.now()
|
|
timeout = datetime.timedelta(
|
|
seconds=config.getint('session', 'timeout'))
|
|
session, = cls.search([
|
|
('create_uid', '=', user),
|
|
('key', '=', key),
|
|
domain or [],
|
|
], limit=1)
|
|
timestamp = session.write_date or session.create_date
|
|
valid = abs(timestamp - now) < timeout
|
|
if not valid:
|
|
cls.delete([session])
|
|
return valid
|
|
|
|
@classmethod
|
|
def reset(cls, key, domain=None):
|
|
"Reset key session timestamp"
|
|
now = datetime.datetime.now()
|
|
last_reset = cls._session_reset_cache.get(key)
|
|
if last_reset is None or (now - _reset_interval) > last_reset:
|
|
timestamp = now - _session_timeout
|
|
sessions = cls.search([
|
|
('key', '=', key),
|
|
['OR',
|
|
('create_date', '>=', timestamp),
|
|
('write_date', '>=', timestamp),
|
|
],
|
|
domain or [],
|
|
])
|
|
cls.write(sessions, {})
|
|
|
|
@classmethod
|
|
def clear(cls, users, domain=None):
|
|
"Clear all sessions for users"
|
|
sessions = cls.search([
|
|
('create_uid', 'in', users),
|
|
domain or [],
|
|
])
|
|
cls.delete(sessions)
|
|
|
|
@classmethod
|
|
def create(cls, vlist):
|
|
vlist = [v.copy() for v in vlist]
|
|
for values in vlist:
|
|
# Ensure to get a different key for each record
|
|
# default methods are called only once
|
|
values.setdefault('key', cls.default_key())
|
|
return super(Session, cls).create(vlist)
|
|
|
|
|
|
class SessionWizard(ModelSQL):
|
|
"Session Wizard"
|
|
__name__ = 'ir.session.wizard'
|
|
|
|
data = fields.Text('Data')
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super(SessionWizard, cls).__setup__()
|
|
cls.__rpc__ = {}
|
|
|
|
@staticmethod
|
|
def default_data():
|
|
return json.dumps({})
|