Files
tradon/tests/test_trigger.py
2025-12-26 13:11:43 +00:00

482 lines
15 KiB
Python
Executable File

# -*- coding: utf-8 -*-
# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
import datetime
import unittest
from itertools import combinations
from trytond.ir.exceptions import TriggerConditionError
from trytond.model.exceptions import SQLConstraintError
from trytond.pool import Pool
from trytond.pyson import Eval, PYSONEncoder
from trytond.tests.test_tryton import activate_module, with_transaction
from trytond.tests.trigger import TRIGGER_LOGS
from trytond.transaction import Transaction
class TriggerTestCase(unittest.TestCase):
'Test Trigger'
@classmethod
def setUpClass(cls):
activate_module('tests')
def setUp(self):
TRIGGER_LOGS.clear()
def run_tasks(self):
pool = Pool()
Queue = pool.get('ir.queue')
transaction = Transaction()
while transaction.tasks:
task = Queue(transaction.tasks.pop())
task.run()
@with_transaction()
def test_constraints(self):
'Test constraints'
pool = Pool()
Model = pool.get('ir.model')
Trigger = pool.get('ir.trigger')
transaction = Transaction()
model, = Model.search([
('model', '=', 'test.triggered'),
])
values = {
'name': 'Test',
'model': model.id,
'on_time': True,
'condition': 'true',
'action': 'test.trigger_action|trigger',
}
self.assertTrue(Trigger.create([values]))
transaction.rollback()
# on_exclusive
for i in range(1, 4):
for combination in combinations(
['create', 'write', 'delete'], i):
combination_values = values.copy()
for mode in combination:
combination_values['on_%s' % mode] = True
self.assertRaises(SQLConstraintError, Trigger.create,
[combination_values])
transaction.rollback()
# check_condition
condition_values = values.copy()
condition_values['condition'] = '='
self.assertRaises(TriggerConditionError, Trigger.create,
[condition_values])
transaction.rollback()
# Restart the cache on the get_triggers method of ir.trigger
Trigger._get_triggers_cache.clear()
@with_transaction()
def test_on_create(self):
'Test on_create'
pool = Pool()
Model = pool.get('ir.model')
Trigger = pool.get('ir.trigger')
Triggered = pool.get('test.triggered')
model, = Model.search([
('model', '=', 'test.triggered'),
])
trigger, = Trigger.create([{
'name': 'Test',
'model': model.id,
'on_create': True,
'condition': 'true',
'action': 'test.trigger_action|trigger',
}])
triggered, = Triggered.create([{
'name': 'Test',
}])
self.run_tasks()
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
TRIGGER_LOGS.pop()
# Trigger with condition
condition = PYSONEncoder().encode(
Eval('self', {}).get('name') == 'Bar')
Trigger.write([trigger], {
'condition': condition,
})
# Matching condition
triggered, = Triggered.create([{
'name': 'Bar',
}])
self.run_tasks()
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
TRIGGER_LOGS.pop()
# Non matching condition
triggered, = Triggered.create([{
'name': 'Foo',
}])
self.run_tasks()
self.assertEqual(TRIGGER_LOGS, [])
# With limit number
Trigger.write([trigger], {
'condition': 'true',
'limit_number': 1,
})
triggered, = Triggered.create([{
'name': 'Test',
}])
self.run_tasks()
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
TRIGGER_LOGS.pop()
# With minimum delay
Trigger.write([trigger], {
'limit_number': 0,
'minimum_time_delay': datetime.timedelta(hours=1),
})
triggered, = Triggered.create([{
'name': 'Test',
}])
self.run_tasks()
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
TRIGGER_LOGS.pop()
# Restart the cache on the get_triggers method of ir.trigger
Trigger._get_triggers_cache.clear()
@with_transaction()
def test_on_write(self):
'Test on_write'
pool = Pool()
Model = pool.get('ir.model')
Trigger = pool.get('ir.trigger')
TriggerLog = pool.get('ir.trigger.log')
Triggered = pool.get('test.triggered')
model, = Model.search([
('model', '=', 'test.triggered'),
])
trigger, = Trigger.create([{
'name': 'Test',
'model': model.id,
'on_write': True,
'condition': 'true',
'action': 'test.trigger_action|trigger',
}])
triggered, = Triggered.create([{
'name': 'Test',
}])
Triggered.write([triggered], {
'name': 'Foo',
})
self.run_tasks()
self.assertEqual(TRIGGER_LOGS, [])
# Trigger with condition
condition = PYSONEncoder().encode(
Eval('self', {}).get('name') == 'Bar')
Trigger.write([trigger], {
'condition': condition,
})
# Matching condition
Triggered.write([triggered], {
'name': 'Bar',
})
self.run_tasks()
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
TRIGGER_LOGS.pop()
# No change in condition
Triggered.write([triggered], {
'name': 'Bar',
})
self.run_tasks()
self.assertEqual(TRIGGER_LOGS, [])
# Different change in condition
Triggered.write([triggered], {
'name': 'Foo',
})
self.run_tasks()
self.assertEqual(TRIGGER_LOGS, [])
# With limit number
condition = PYSONEncoder().encode(
Eval('self', {}).get('name') == 'Bar')
Trigger.write([trigger], {
'condition': condition,
'limit_number': 1,
})
triggered, = Triggered.create([{
'name': 'Foo',
}])
Triggered.write([triggered], {
'name': 'Bar',
})
Triggered.write([triggered], {
'name': 'Foo',
})
Triggered.write([triggered], {
'name': 'Bar',
})
self.run_tasks()
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
TRIGGER_LOGS.pop()
# With minimum delay
Trigger.write([trigger], {
'limit_number': 0,
'minimum_time_delay': datetime.timedelta.max,
})
triggered, = Triggered.create([{
'name': 'Foo',
}])
for name in ('Bar', 'Foo', 'Bar'):
Triggered.write([triggered], {
'name': name,
})
self.run_tasks()
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
TRIGGER_LOGS.pop()
Trigger.write([trigger], {
'minimum_time_delay': datetime.timedelta(seconds=1),
})
triggered, = Triggered.create([{
'name': 'Foo',
}])
for name in ('Bar', 'Foo'):
Triggered.write([triggered], {
'name': name,
})
self.run_tasks()
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
TRIGGER_LOGS.pop()
Transaction().trigger_records.clear()
# Make time pass by moving back in time the log creation
trigger_log = TriggerLog.__table__()
cursor = Transaction().connection.cursor()
cursor.execute(*trigger_log.update(
[trigger_log.create_date],
[datetime.datetime.now() - datetime.timedelta(days=1)],
where=trigger_log.record_id == triggered.id))
Triggered.write([triggered], {
'name': 'Bar',
})
self.run_tasks()
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
TRIGGER_LOGS.pop()
# Restart the cache on the get_triggers method of ir.trigger
Trigger._get_triggers_cache.clear()
@with_transaction()
def test_on_delete(self):
'Test on_delete'
pool = Pool()
Model = pool.get('ir.model')
Trigger = pool.get('ir.trigger')
Triggered = pool.get('test.triggered')
TriggerLog = pool.get('ir.trigger.log')
model, = Model.search([
('model', '=', 'test.triggered'),
])
triggered, = Triggered.create([{
'name': 'Test',
}])
trigger, = Trigger.create([{
'name': 'Test',
'model': model.id,
'on_delete': True,
'condition': 'true',
'action': 'test.trigger_action|trigger',
}])
Triggered.delete([triggered])
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
TRIGGER_LOGS.pop()
Transaction().delete = {}
# Trigger with condition
condition = PYSONEncoder().encode(
Eval('self', {}).get('name') == 'Bar')
Trigger.write([trigger], {
'condition': condition,
})
triggered, = Triggered.create([{
'name': 'Bar',
}])
# Matching condition
Triggered.delete([triggered])
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
TRIGGER_LOGS.pop()
Transaction().delete = {}
triggered, = Triggered.create([{
'name': 'Foo',
}])
# Non matching condition
Triggered.delete([triggered])
self.assertEqual(TRIGGER_LOGS, [])
Transaction().delete = {}
triggered, = Triggered.create([{
'name': 'Test',
}])
# With limit number
Trigger.write([trigger], {
'condition': 'true',
'limit_number': 1,
})
Triggered.delete([triggered])
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
TRIGGER_LOGS.pop()
Transaction().delete = {}
# Delete trigger logs because SQLite reuse the same triggered_id
TriggerLog.delete(TriggerLog.search([
('trigger', '=', trigger.id),
]))
triggered, = Triggered.create([{
'name': 'Test',
}])
# With minimum delay
Trigger.write([trigger], {
'limit_number': 0,
'minimum_time_delay': datetime.timedelta(hours=1),
})
Triggered.delete([triggered])
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
TRIGGER_LOGS.pop()
Transaction().delete = {}
# Restart the cache on the get_triggers method of ir.trigger
Trigger._get_triggers_cache.clear()
@with_transaction()
def test_on_time(self):
'Test on_time'
pool = Pool()
Model = pool.get('ir.model')
Trigger = pool.get('ir.trigger')
Triggered = pool.get('test.triggered')
TriggerLog = pool.get('ir.trigger.log')
model, = Model.search([
('model', '=', 'test.triggered'),
])
trigger, = Trigger.create([{
'name': 'Test',
'model': model.id,
'on_time': True,
'condition': 'true',
'action': 'test.trigger_action|trigger',
}])
triggered, = Triggered.create([{
'name': 'Test',
}])
Trigger.trigger_time()
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
TRIGGER_LOGS.pop()
# Trigger with condition
condition = PYSONEncoder().encode(
Eval('self', {}).get('name') == 'Bar')
Trigger.write([trigger], {
'condition': condition,
})
# Matching condition
Triggered.write([triggered], {
'name': 'Bar',
})
Trigger.trigger_time()
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
TRIGGER_LOGS.pop()
# Non matching condition
Triggered.write([triggered], {
'name': 'Foo',
})
Trigger.trigger_time()
self.assertEqual(TRIGGER_LOGS, [])
# With limit number
Trigger.write([trigger], {
'condition': 'true',
'limit_number': 1,
})
Trigger.trigger_time()
Trigger.trigger_time()
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
TRIGGER_LOGS.pop()
# Delete trigger logs of limit number test
TriggerLog.delete(TriggerLog.search([
('trigger', '=', trigger.id),
]))
# With minimum delay
Trigger.write([trigger], {
'limit_number': 0,
'minimum_time_delay': datetime.timedelta.max,
})
Trigger.trigger_time()
Trigger.trigger_time()
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
TRIGGER_LOGS.pop()
Transaction().delete = {}
# Delete trigger logs of previous minimum delay test
TriggerLog.delete(TriggerLog.search([
('trigger', '=', trigger.id),
]))
Trigger.write([trigger], {
'minimum_time_delay': datetime.timedelta(seconds=1),
})
Trigger.trigger_time()
# Make time pass by moving back in time the log creation
trigger_log = TriggerLog.__table__()
cursor = Transaction().connection.cursor()
cursor.execute(*trigger_log.update(
[trigger_log.create_date],
[datetime.datetime.now() - datetime.timedelta(days=1)],
where=trigger_log.record_id == triggered.id))
Trigger.trigger_time()
self.assertEqual(
TRIGGER_LOGS, [([triggered], trigger), ([triggered], trigger)])
TRIGGER_LOGS.pop()
TRIGGER_LOGS.pop()
Transaction().delete = {}
# Restart the cache on the get_triggers method of ir.trigger
Trigger._get_triggers_cache.clear()