263 lines
9.3 KiB
Python
Executable File
263 lines
9.3 KiB
Python
Executable File
# This file is part of Tryton. The COPYRIGHT file at the top level of
|
|
# this repository contains the full copyright notices and license terms.
|
|
import datetime
|
|
|
|
from sql import Literal, Null, With
|
|
from sql.aggregate import Min
|
|
from sql.functions import CurrentTimestamp, Extract
|
|
|
|
from trytond.config import config
|
|
from trytond.model import Index, ModelSQL, fields
|
|
from trytond.pool import Pool
|
|
from trytond.tools import grouped_slice
|
|
from trytond.transaction import (
|
|
Transaction, inactive_records, without_check_access)
|
|
|
|
has_worker = config.getboolean('queue', 'worker', default=False)
|
|
clean_days = config.getint('queue', 'clean_days', default=30)
|
|
batch_size = config.getint('queue', 'batch_size', default=20)
|
|
|
|
|
|
class Queue(ModelSQL):
|
|
"Queue"
|
|
__name__ = 'ir.queue'
|
|
name = fields.Char("Name", required=True)
|
|
|
|
data = fields.Dict(None, "Data")
|
|
|
|
enqueued_at = fields.Timestamp("Enqueued at", required=True)
|
|
dequeued_at = fields.Timestamp("Dequeued at")
|
|
finished_at = fields.Timestamp("Finished at")
|
|
|
|
scheduled_at = fields.Timestamp("Scheduled at",
|
|
help="When the task can start.")
|
|
expected_at = fields.Timestamp("Expected at",
|
|
help="When the task should be done.")
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super().__setup__()
|
|
table = cls.__table__()
|
|
|
|
cls._sql_indexes.add(
|
|
Index(
|
|
table,
|
|
(table.scheduled_at, Index.Range(nulls_first=True)),
|
|
(table.expected_at, Index.Range(nulls_first=True)),
|
|
(table.dequeued_at, Index.Equality()),
|
|
(table.name, Index.Equality())))
|
|
|
|
@classmethod
|
|
def default_enqueued_at(cls):
|
|
return datetime.datetime.now()
|
|
|
|
@classmethod
|
|
def copy(cls, records, default=None):
|
|
if default is None:
|
|
default = {}
|
|
else:
|
|
default = default.copy()
|
|
default.setdefault('enqueued_at')
|
|
default.setdefault('dequeued_at')
|
|
default.setdefault('finished_at')
|
|
return super(Queue, cls).copy(records, default=default)
|
|
|
|
@classmethod
|
|
def push(cls, name, data, scheduled_at=None, expected_at=None):
|
|
transaction = Transaction()
|
|
database = transaction.database
|
|
cursor = transaction.connection.cursor()
|
|
with without_check_access():
|
|
record, = cls.create([{
|
|
'name': name,
|
|
'data': data,
|
|
'scheduled_at': scheduled_at,
|
|
'expected_at': expected_at,
|
|
}])
|
|
if database.has_channel():
|
|
cursor.execute('NOTIFY "%s"', (cls.__name__,))
|
|
if not has_worker:
|
|
transaction.tasks.append(record.id)
|
|
return record.id
|
|
|
|
@classmethod
|
|
def pull(cls, database, connection, name=None):
|
|
cursor = connection.cursor()
|
|
queue = cls.__table__()
|
|
queue_c = cls.__table__()
|
|
queue_s = cls.__table__()
|
|
|
|
candidates = With('id', 'scheduled_at', 'expected_at',
|
|
query=queue_c.select(
|
|
queue_c.id,
|
|
queue_c.scheduled_at,
|
|
queue_c.expected_at,
|
|
where=((queue_c.name == name) if name else Literal(True))
|
|
& (queue_c.dequeued_at == Null),
|
|
order_by=[
|
|
queue_c.scheduled_at.nulls_first,
|
|
queue_c.expected_at.nulls_first]))
|
|
selected = queue_s.select(
|
|
queue_s.id,
|
|
where=((queue_s.name == name) if name else Literal(True))
|
|
& (queue_s.dequeued_at == Null)
|
|
& ((queue_s.scheduled_at <= CurrentTimestamp())
|
|
| (queue_s.scheduled_at == Null)),
|
|
order_by=[
|
|
queue_s.scheduled_at.nulls_first,
|
|
queue_s.expected_at.nulls_first],
|
|
limit=1)
|
|
if database.has_select_for():
|
|
For = database.get_select_for_skip_locked()
|
|
selected.for_ = For('UPDATE')
|
|
|
|
next_timeout = With('seconds', query=candidates.select(
|
|
Min(Extract('EPOCH',
|
|
candidates.scheduled_at - CurrentTimestamp())
|
|
),
|
|
where=candidates.scheduled_at >= CurrentTimestamp()))
|
|
|
|
task_id, seconds = None, None
|
|
if database.has_returning():
|
|
query = queue.update([queue.dequeued_at], [CurrentTimestamp()],
|
|
where=queue.id.in_(selected),
|
|
with_=[candidates, next_timeout],
|
|
returning=[
|
|
queue.id, next_timeout.select(next_timeout.seconds)])
|
|
cursor.execute(*query)
|
|
row = cursor.fetchone()
|
|
if row:
|
|
task_id, seconds = row
|
|
else:
|
|
query = queue.select(queue.id,
|
|
where=queue.id.in_(selected),
|
|
with_=[candidates])
|
|
cursor.execute(*query)
|
|
row = cursor.fetchone()
|
|
if row:
|
|
task_id, = row
|
|
query = queue.update([queue.dequeued_at], [CurrentTimestamp()],
|
|
where=queue.id == task_id)
|
|
cursor.execute(*query)
|
|
query = next_timeout.select(next_timeout.seconds)
|
|
cursor.execute(*query)
|
|
row = cursor.fetchone()
|
|
if row:
|
|
seconds, = row
|
|
|
|
if not task_id and database.has_channel():
|
|
cursor.execute('LISTEN "%s"', (cls.__name__,))
|
|
return task_id, seconds
|
|
|
|
def run(self):
|
|
transaction = Transaction()
|
|
Model = Pool().get(self.data['model'])
|
|
with transaction.set_user(self.data['user']), \
|
|
transaction.set_context(
|
|
self.data['context'], _skip_warnings=True):
|
|
instances = self.data['instances']
|
|
# Ensure record ids still exist
|
|
if isinstance(instances, int):
|
|
with inactive_records():
|
|
if Model.search([('id', '=', instances)]):
|
|
instances = Model(instances)
|
|
else:
|
|
instances = None
|
|
else:
|
|
ids = set()
|
|
with inactive_records():
|
|
for sub_ids in grouped_slice(instances):
|
|
records = Model.search([('id', 'in', list(sub_ids))])
|
|
ids.update(map(int, records))
|
|
if ids:
|
|
instances = Model.browse(
|
|
[i for i in instances if i in ids])
|
|
else:
|
|
instances = None
|
|
if instances is not None:
|
|
getattr(Model, self.data['method'])(
|
|
instances, *self.data['args'], **self.data['kwargs'])
|
|
if not self.dequeued_at:
|
|
self.dequeued_at = datetime.datetime.now()
|
|
self.finished_at = datetime.datetime.now()
|
|
self.save()
|
|
|
|
@classmethod
|
|
def clean(cls, date=None):
|
|
if date is None:
|
|
date = (
|
|
datetime.datetime.now() - datetime.timedelta(days=clean_days))
|
|
tasks = cls.search(['OR',
|
|
('dequeued_at', '<', date),
|
|
('finished_at', '<', date),
|
|
])
|
|
cls.delete(tasks)
|
|
|
|
@classmethod
|
|
def caller(cls, model):
|
|
return _Model(cls, model)
|
|
|
|
|
|
class _Model(object):
|
|
def __init__(self, queue, model):
|
|
self.__queue = queue
|
|
self.__model = model
|
|
|
|
def __getattr__(self, name):
|
|
return _Method(self.__queue, self.__model, name)
|
|
|
|
|
|
class _Method(object):
|
|
def __init__(self, queue, model, name):
|
|
self.__queue = queue
|
|
self.__model = model
|
|
self.__name = name
|
|
|
|
def __call__(self, instances, *args, **kwargs):
|
|
transaction = Transaction()
|
|
context = transaction.context.copy()
|
|
name = context.pop('queue_name', 'default')
|
|
now = datetime.datetime.now()
|
|
scheduled_at = context.pop('queue_scheduled_at', None)
|
|
if scheduled_at is not None:
|
|
scheduled_at = now + scheduled_at
|
|
expected_at = context.pop('queue_expected_at', None)
|
|
queue_batch = context.pop('queue_batch', None)
|
|
context.pop('_check_access', None)
|
|
context.pop('language', None)
|
|
if expected_at is not None:
|
|
expected_at = now + expected_at
|
|
try:
|
|
instances = list(map(int, instances))
|
|
except TypeError:
|
|
instances = int(instances)
|
|
|
|
def _push(instances):
|
|
data = {
|
|
'model': self.__model.__name__,
|
|
'method': self.__name,
|
|
'user': transaction.user,
|
|
'context': context,
|
|
'instances': instances,
|
|
'args': args,
|
|
'kwargs': kwargs,
|
|
}
|
|
return self.__queue.push(
|
|
name, data,
|
|
scheduled_at=scheduled_at, expected_at=expected_at)
|
|
|
|
if isinstance(instances, list):
|
|
if has_worker and queue_batch:
|
|
if isinstance(queue_batch, bool):
|
|
count = batch_size
|
|
else:
|
|
count = int(queue_batch)
|
|
else:
|
|
count = len(instances)
|
|
task_ids = []
|
|
for sub_instances in grouped_slice(instances, count=count):
|
|
task_ids.append(_push(list(sub_instances)))
|
|
return task_ids
|
|
else:
|
|
return _push(instances)
|