482 lines
15 KiB
Python
Executable File
482 lines
15 KiB
Python
Executable File
# -*- coding: utf-8 -*-
|
|
# This file is part of Tryton. The COPYRIGHT file at the top level of
|
|
# this repository contains the full copyright notices and license terms.
|
|
import datetime
|
|
import unittest
|
|
from itertools import combinations
|
|
|
|
from trytond.ir.exceptions import TriggerConditionError
|
|
from trytond.model.exceptions import SQLConstraintError
|
|
from trytond.pool import Pool
|
|
from trytond.pyson import Eval, PYSONEncoder
|
|
from trytond.tests.test_tryton import activate_module, with_transaction
|
|
from trytond.tests.trigger import TRIGGER_LOGS
|
|
from trytond.transaction import Transaction
|
|
|
|
|
|
class TriggerTestCase(unittest.TestCase):
|
|
'Test Trigger'
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
activate_module('tests')
|
|
|
|
def setUp(self):
|
|
TRIGGER_LOGS.clear()
|
|
|
|
def run_tasks(self):
|
|
pool = Pool()
|
|
Queue = pool.get('ir.queue')
|
|
transaction = Transaction()
|
|
while transaction.tasks:
|
|
task = Queue(transaction.tasks.pop())
|
|
task.run()
|
|
|
|
@with_transaction()
|
|
def test_constraints(self):
|
|
'Test constraints'
|
|
pool = Pool()
|
|
Model = pool.get('ir.model')
|
|
Trigger = pool.get('ir.trigger')
|
|
transaction = Transaction()
|
|
|
|
model, = Model.search([
|
|
('model', '=', 'test.triggered'),
|
|
])
|
|
|
|
values = {
|
|
'name': 'Test',
|
|
'model': model.id,
|
|
'on_time': True,
|
|
'condition': 'true',
|
|
'action': 'test.trigger_action|trigger',
|
|
}
|
|
self.assertTrue(Trigger.create([values]))
|
|
|
|
transaction.rollback()
|
|
|
|
# on_exclusive
|
|
for i in range(1, 4):
|
|
for combination in combinations(
|
|
['create', 'write', 'delete'], i):
|
|
combination_values = values.copy()
|
|
for mode in combination:
|
|
combination_values['on_%s' % mode] = True
|
|
self.assertRaises(SQLConstraintError, Trigger.create,
|
|
[combination_values])
|
|
transaction.rollback()
|
|
|
|
# check_condition
|
|
condition_values = values.copy()
|
|
condition_values['condition'] = '='
|
|
self.assertRaises(TriggerConditionError, Trigger.create,
|
|
[condition_values])
|
|
transaction.rollback()
|
|
|
|
# Restart the cache on the get_triggers method of ir.trigger
|
|
Trigger._get_triggers_cache.clear()
|
|
|
|
@with_transaction()
|
|
def test_on_create(self):
|
|
'Test on_create'
|
|
pool = Pool()
|
|
Model = pool.get('ir.model')
|
|
Trigger = pool.get('ir.trigger')
|
|
Triggered = pool.get('test.triggered')
|
|
|
|
model, = Model.search([
|
|
('model', '=', 'test.triggered'),
|
|
])
|
|
|
|
trigger, = Trigger.create([{
|
|
'name': 'Test',
|
|
'model': model.id,
|
|
'on_create': True,
|
|
'condition': 'true',
|
|
'action': 'test.trigger_action|trigger',
|
|
}])
|
|
|
|
triggered, = Triggered.create([{
|
|
'name': 'Test',
|
|
}])
|
|
|
|
self.run_tasks()
|
|
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
|
|
TRIGGER_LOGS.pop()
|
|
|
|
# Trigger with condition
|
|
condition = PYSONEncoder().encode(
|
|
Eval('self', {}).get('name') == 'Bar')
|
|
Trigger.write([trigger], {
|
|
'condition': condition,
|
|
})
|
|
|
|
# Matching condition
|
|
triggered, = Triggered.create([{
|
|
'name': 'Bar',
|
|
}])
|
|
self.run_tasks()
|
|
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
|
|
TRIGGER_LOGS.pop()
|
|
|
|
# Non matching condition
|
|
triggered, = Triggered.create([{
|
|
'name': 'Foo',
|
|
}])
|
|
self.run_tasks()
|
|
self.assertEqual(TRIGGER_LOGS, [])
|
|
|
|
# With limit number
|
|
Trigger.write([trigger], {
|
|
'condition': 'true',
|
|
'limit_number': 1,
|
|
})
|
|
triggered, = Triggered.create([{
|
|
'name': 'Test',
|
|
}])
|
|
self.run_tasks()
|
|
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
|
|
TRIGGER_LOGS.pop()
|
|
|
|
# With minimum delay
|
|
Trigger.write([trigger], {
|
|
'limit_number': 0,
|
|
'minimum_time_delay': datetime.timedelta(hours=1),
|
|
})
|
|
triggered, = Triggered.create([{
|
|
'name': 'Test',
|
|
}])
|
|
self.run_tasks()
|
|
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
|
|
TRIGGER_LOGS.pop()
|
|
|
|
# Restart the cache on the get_triggers method of ir.trigger
|
|
Trigger._get_triggers_cache.clear()
|
|
|
|
@with_transaction()
|
|
def test_on_write(self):
|
|
'Test on_write'
|
|
pool = Pool()
|
|
Model = pool.get('ir.model')
|
|
Trigger = pool.get('ir.trigger')
|
|
TriggerLog = pool.get('ir.trigger.log')
|
|
Triggered = pool.get('test.triggered')
|
|
|
|
model, = Model.search([
|
|
('model', '=', 'test.triggered'),
|
|
])
|
|
|
|
trigger, = Trigger.create([{
|
|
'name': 'Test',
|
|
'model': model.id,
|
|
'on_write': True,
|
|
'condition': 'true',
|
|
'action': 'test.trigger_action|trigger',
|
|
}])
|
|
|
|
triggered, = Triggered.create([{
|
|
'name': 'Test',
|
|
}])
|
|
|
|
Triggered.write([triggered], {
|
|
'name': 'Foo',
|
|
})
|
|
self.run_tasks()
|
|
self.assertEqual(TRIGGER_LOGS, [])
|
|
|
|
# Trigger with condition
|
|
condition = PYSONEncoder().encode(
|
|
Eval('self', {}).get('name') == 'Bar')
|
|
Trigger.write([trigger], {
|
|
'condition': condition,
|
|
})
|
|
|
|
# Matching condition
|
|
Triggered.write([triggered], {
|
|
'name': 'Bar',
|
|
})
|
|
self.run_tasks()
|
|
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
|
|
TRIGGER_LOGS.pop()
|
|
|
|
# No change in condition
|
|
Triggered.write([triggered], {
|
|
'name': 'Bar',
|
|
})
|
|
self.run_tasks()
|
|
self.assertEqual(TRIGGER_LOGS, [])
|
|
|
|
# Different change in condition
|
|
Triggered.write([triggered], {
|
|
'name': 'Foo',
|
|
})
|
|
self.run_tasks()
|
|
self.assertEqual(TRIGGER_LOGS, [])
|
|
|
|
# With limit number
|
|
condition = PYSONEncoder().encode(
|
|
Eval('self', {}).get('name') == 'Bar')
|
|
Trigger.write([trigger], {
|
|
'condition': condition,
|
|
'limit_number': 1,
|
|
})
|
|
triggered, = Triggered.create([{
|
|
'name': 'Foo',
|
|
}])
|
|
Triggered.write([triggered], {
|
|
'name': 'Bar',
|
|
})
|
|
Triggered.write([triggered], {
|
|
'name': 'Foo',
|
|
})
|
|
Triggered.write([triggered], {
|
|
'name': 'Bar',
|
|
})
|
|
self.run_tasks()
|
|
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
|
|
TRIGGER_LOGS.pop()
|
|
|
|
# With minimum delay
|
|
Trigger.write([trigger], {
|
|
'limit_number': 0,
|
|
'minimum_time_delay': datetime.timedelta.max,
|
|
})
|
|
triggered, = Triggered.create([{
|
|
'name': 'Foo',
|
|
}])
|
|
for name in ('Bar', 'Foo', 'Bar'):
|
|
Triggered.write([triggered], {
|
|
'name': name,
|
|
})
|
|
self.run_tasks()
|
|
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
|
|
TRIGGER_LOGS.pop()
|
|
|
|
Trigger.write([trigger], {
|
|
'minimum_time_delay': datetime.timedelta(seconds=1),
|
|
})
|
|
triggered, = Triggered.create([{
|
|
'name': 'Foo',
|
|
}])
|
|
for name in ('Bar', 'Foo'):
|
|
Triggered.write([triggered], {
|
|
'name': name,
|
|
})
|
|
self.run_tasks()
|
|
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
|
|
TRIGGER_LOGS.pop()
|
|
Transaction().trigger_records.clear()
|
|
|
|
# Make time pass by moving back in time the log creation
|
|
trigger_log = TriggerLog.__table__()
|
|
cursor = Transaction().connection.cursor()
|
|
cursor.execute(*trigger_log.update(
|
|
[trigger_log.create_date],
|
|
[datetime.datetime.now() - datetime.timedelta(days=1)],
|
|
where=trigger_log.record_id == triggered.id))
|
|
|
|
Triggered.write([triggered], {
|
|
'name': 'Bar',
|
|
})
|
|
self.run_tasks()
|
|
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
|
|
TRIGGER_LOGS.pop()
|
|
|
|
# Restart the cache on the get_triggers method of ir.trigger
|
|
Trigger._get_triggers_cache.clear()
|
|
|
|
@with_transaction()
|
|
def test_on_delete(self):
|
|
'Test on_delete'
|
|
pool = Pool()
|
|
Model = pool.get('ir.model')
|
|
Trigger = pool.get('ir.trigger')
|
|
Triggered = pool.get('test.triggered')
|
|
TriggerLog = pool.get('ir.trigger.log')
|
|
|
|
model, = Model.search([
|
|
('model', '=', 'test.triggered'),
|
|
])
|
|
|
|
triggered, = Triggered.create([{
|
|
'name': 'Test',
|
|
}])
|
|
|
|
trigger, = Trigger.create([{
|
|
'name': 'Test',
|
|
'model': model.id,
|
|
'on_delete': True,
|
|
'condition': 'true',
|
|
'action': 'test.trigger_action|trigger',
|
|
}])
|
|
|
|
Triggered.delete([triggered])
|
|
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
|
|
TRIGGER_LOGS.pop()
|
|
Transaction().delete = {}
|
|
|
|
# Trigger with condition
|
|
condition = PYSONEncoder().encode(
|
|
Eval('self', {}).get('name') == 'Bar')
|
|
Trigger.write([trigger], {
|
|
'condition': condition,
|
|
})
|
|
|
|
triggered, = Triggered.create([{
|
|
'name': 'Bar',
|
|
}])
|
|
|
|
# Matching condition
|
|
Triggered.delete([triggered])
|
|
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
|
|
TRIGGER_LOGS.pop()
|
|
Transaction().delete = {}
|
|
|
|
triggered, = Triggered.create([{
|
|
'name': 'Foo',
|
|
}])
|
|
|
|
# Non matching condition
|
|
Triggered.delete([triggered])
|
|
self.assertEqual(TRIGGER_LOGS, [])
|
|
Transaction().delete = {}
|
|
|
|
triggered, = Triggered.create([{
|
|
'name': 'Test',
|
|
}])
|
|
|
|
# With limit number
|
|
Trigger.write([trigger], {
|
|
'condition': 'true',
|
|
'limit_number': 1,
|
|
})
|
|
Triggered.delete([triggered])
|
|
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
|
|
TRIGGER_LOGS.pop()
|
|
Transaction().delete = {}
|
|
# Delete trigger logs because SQLite reuse the same triggered_id
|
|
TriggerLog.delete(TriggerLog.search([
|
|
('trigger', '=', trigger.id),
|
|
]))
|
|
|
|
triggered, = Triggered.create([{
|
|
'name': 'Test',
|
|
}])
|
|
|
|
# With minimum delay
|
|
Trigger.write([trigger], {
|
|
'limit_number': 0,
|
|
'minimum_time_delay': datetime.timedelta(hours=1),
|
|
})
|
|
Triggered.delete([triggered])
|
|
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
|
|
TRIGGER_LOGS.pop()
|
|
Transaction().delete = {}
|
|
|
|
# Restart the cache on the get_triggers method of ir.trigger
|
|
Trigger._get_triggers_cache.clear()
|
|
|
|
@with_transaction()
|
|
def test_on_time(self):
|
|
'Test on_time'
|
|
pool = Pool()
|
|
Model = pool.get('ir.model')
|
|
Trigger = pool.get('ir.trigger')
|
|
Triggered = pool.get('test.triggered')
|
|
TriggerLog = pool.get('ir.trigger.log')
|
|
|
|
model, = Model.search([
|
|
('model', '=', 'test.triggered'),
|
|
])
|
|
|
|
trigger, = Trigger.create([{
|
|
'name': 'Test',
|
|
'model': model.id,
|
|
'on_time': True,
|
|
'condition': 'true',
|
|
'action': 'test.trigger_action|trigger',
|
|
}])
|
|
|
|
triggered, = Triggered.create([{
|
|
'name': 'Test',
|
|
}])
|
|
Trigger.trigger_time()
|
|
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
|
|
TRIGGER_LOGS.pop()
|
|
|
|
# Trigger with condition
|
|
condition = PYSONEncoder().encode(
|
|
Eval('self', {}).get('name') == 'Bar')
|
|
Trigger.write([trigger], {
|
|
'condition': condition,
|
|
})
|
|
|
|
# Matching condition
|
|
Triggered.write([triggered], {
|
|
'name': 'Bar',
|
|
})
|
|
Trigger.trigger_time()
|
|
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
|
|
TRIGGER_LOGS.pop()
|
|
|
|
# Non matching condition
|
|
Triggered.write([triggered], {
|
|
'name': 'Foo',
|
|
})
|
|
Trigger.trigger_time()
|
|
self.assertEqual(TRIGGER_LOGS, [])
|
|
|
|
# With limit number
|
|
Trigger.write([trigger], {
|
|
'condition': 'true',
|
|
'limit_number': 1,
|
|
})
|
|
Trigger.trigger_time()
|
|
Trigger.trigger_time()
|
|
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
|
|
TRIGGER_LOGS.pop()
|
|
|
|
# Delete trigger logs of limit number test
|
|
TriggerLog.delete(TriggerLog.search([
|
|
('trigger', '=', trigger.id),
|
|
]))
|
|
|
|
# With minimum delay
|
|
Trigger.write([trigger], {
|
|
'limit_number': 0,
|
|
'minimum_time_delay': datetime.timedelta.max,
|
|
})
|
|
Trigger.trigger_time()
|
|
Trigger.trigger_time()
|
|
self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])
|
|
TRIGGER_LOGS.pop()
|
|
Transaction().delete = {}
|
|
|
|
# Delete trigger logs of previous minimum delay test
|
|
TriggerLog.delete(TriggerLog.search([
|
|
('trigger', '=', trigger.id),
|
|
]))
|
|
|
|
Trigger.write([trigger], {
|
|
'minimum_time_delay': datetime.timedelta(seconds=1),
|
|
})
|
|
Trigger.trigger_time()
|
|
|
|
# Make time pass by moving back in time the log creation
|
|
trigger_log = TriggerLog.__table__()
|
|
cursor = Transaction().connection.cursor()
|
|
cursor.execute(*trigger_log.update(
|
|
[trigger_log.create_date],
|
|
[datetime.datetime.now() - datetime.timedelta(days=1)],
|
|
where=trigger_log.record_id == triggered.id))
|
|
|
|
Trigger.trigger_time()
|
|
self.assertEqual(
|
|
TRIGGER_LOGS, [([triggered], trigger), ([triggered], trigger)])
|
|
TRIGGER_LOGS.pop()
|
|
TRIGGER_LOGS.pop()
|
|
Transaction().delete = {}
|
|
|
|
# Restart the cache on the get_triggers method of ir.trigger
|
|
Trigger._get_triggers_cache.clear()
|