Files
tradon/tests/test_cache.py
2025-12-26 13:11:43 +00:00

345 lines
10 KiB
Python
Executable File

# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
import datetime as dt
import os
import time
import unittest
from trytond import backend
from trytond import cache as cache_mod
from trytond.cache import (
LRUDict, LRUDictTransaction, MemoryCache, freeze, unfreeze)
from trytond.tests.test_tryton import (
DB_NAME, USER, activate_module, with_transaction)
from trytond.transaction import Transaction
cache = MemoryCache('test.cache')
cache_expire = MemoryCache('test.cache_expire', duration=1)
cache_ignored_local_context = MemoryCache(
'test.cache.ignored.local', context_ignored_keys={'ignored'})
cache_ignored_global_context = MemoryCache('test.cache.ignored.global')
class CacheTestCase(unittest.TestCase):
"Test Cache"
@classmethod
def setUpClass(cls):
activate_module('tests')
def testFreeze(self):
"Test freeze"
self.assertEqual(freeze([1, 2, 3]), (1, 2, 3))
self.assertEqual(freeze({
'list': [1, 2, 3],
}),
frozenset([('list', (1, 2, 3))]))
self.assertEqual(freeze({
'dict': {
'inner dict': {
'list': [1, 2, 3],
'string': 'test',
},
}
}),
frozenset([('dict',
frozenset([('inner dict',
frozenset([
('list', (1, 2, 3)),
('string', 'test'),
]))]))]))
def testUnfreeze(self):
"Test unfreeze"
for value, result in [
(freeze([1, 2, 3]), [1, 2, 3]),
(freeze({'dict': {
'inner dict': {
'list': [1, 2, 3],
'string': 'test',
},
},
}),
{'dict': {
'inner dict': {
'list': [1, 2, 3],
'string': 'test',
},
},
}),
]:
with self.subTest(value=value):
self.assertEqual(unfreeze(value), result)
@with_transaction()
def test_ignored_context_key_global(self):
"Test global keys are ignored from context"
with Transaction().set_context(client='foo'):
cache_ignored_global_context.set('key', 0)
with Transaction().set_context(client='bar'):
value = cache_ignored_global_context.get('key')
self.assertEqual(value, 0)
@with_transaction()
def test_ignored_context_key_local(self):
"Test local keys are ignored from context"
with Transaction().set_context(ignored='foo'):
cache_ignored_local_context.set('key', 1)
with Transaction().set_context(ignored='bar'):
value = cache_ignored_local_context.get('key')
self.assertEqual(value, 1)
class MemoryCacheTestCase(unittest.TestCase):
"Test Cache"
@classmethod
def setUpClass(cls):
activate_module('tests')
def setUp(self):
super().setUp()
clear_timeout = cache_mod._clear_timeout
cache_mod._clear_timeout = 1
self.addCleanup(
setattr, cache_mod, '_clear_timeout', clear_timeout)
def tearDown(self):
MemoryCache.drop(DB_NAME)
def wait_cache_listening(self):
pass
def wait_cache_sync(self, after=None):
pass
@with_transaction()
def test_memory_cache_set_get(self):
"Test MemoryCache set/get"
cache.set('foo', 'bar')
self.assertEqual(cache.get('foo'), 'bar')
@with_transaction()
def test_memory_cache_mutable(self):
"Test MemoryCache with mutable value"
value = ['bar']
cache.set('foo', value)
value.remove('bar')
self.assertEqual(cache.get('foo'), ['bar'])
@with_transaction()
def test_memory_cache_drop(self):
"Test MemoryCache drop"
cache.set('foo', 'bar')
MemoryCache.drop(DB_NAME)
self.assertEqual(cache.get('foo'), None)
def test_memory_cache_transactions(self):
"Test MemoryCache with concurrent transactions"
transaction1 = Transaction().start(DB_NAME, USER)
self.wait_cache_listening()
self.addCleanup(transaction1.stop)
cache.set('foo', 'bar')
self.assertEqual(cache.get('foo'), 'bar')
transaction2 = transaction1.new_transaction()
self.addCleanup(transaction2.stop)
cache.clear()
self.assertEqual(cache.get('foo'), None)
cache.set('foo', 'baz')
self.assertEqual(cache.get('foo'), 'baz')
with Transaction().set_current_transaction(transaction1):
self.assertEqual(cache.get('foo'), 'bar')
commit_time = dt.datetime.now()
transaction2.commit()
self.wait_cache_sync(after=commit_time)
self.assertEqual(cache.get('foo'), 'baz')
def test_memory_cache_nested_transactions(self):
"Test MemoryCache with nested transactions"
# Create entry in the cache table to trigger 2 updates
with Transaction().start(DB_NAME, USER):
cache.clear()
# Ensure sync is performed on start
time.sleep(cache_mod._clear_timeout)
with Transaction().start(DB_NAME, USER) as transaction1:
cache.clear()
with transaction1.new_transaction():
cache.clear()
def test_memory_cache_sync(self):
"Test MemoryCache synchronisation"
with Transaction().start(DB_NAME, USER):
cache.clear()
time.sleep(cache_mod._clear_timeout)
last = cache._clean_last
with Transaction().start(DB_NAME, USER):
self.assertGreater(cache._clean_last, last)
def test_memory_cache_old_transaction(self):
"Test old transaction does not fill cache"
transaction1 = Transaction().start(DB_NAME, USER)
self.wait_cache_listening()
self.addCleanup(transaction1.stop)
# Clear cache from new transaction
transaction2 = transaction1.new_transaction()
self.addCleanup(transaction2.stop)
cache.clear()
commit_time = dt.datetime.now()
transaction2.commit()
self.wait_cache_sync(after=commit_time)
# Set value from old transaction
Transaction().set_current_transaction(transaction1)
self.addCleanup(transaction1.stop)
cache.set('foo', 'baz')
# New transaction has still empty cache
transaction3 = transaction1.new_transaction()
self.addCleanup(transaction3.stop)
self.assertEqual(cache.get('foo'), None)
@with_transaction()
def test_memory_cache_expire(self):
"Test expired cache"
cache_expire.set('foo', "bar")
time.sleep(cache_expire.duration.total_seconds())
self.assertEqual(cache_expire.get('foo'), None)
@unittest.skipIf(backend.name == 'sqlite', "SQLite has not channel")
class MemoryCacheChannelTestCase(MemoryCacheTestCase):
"Test Cache with channel"
def setUp(self):
super().setUp()
clear_timeout = cache_mod._clear_timeout
cache_mod._clear_timeout = 0
self.addCleanup(
setattr, cache_mod, '_clear_timeout', clear_timeout)
def wait_cache_sync(self, after=None):
if after is None:
after = dt.datetime.now()
while MemoryCache._clean_last < after:
time.sleep(.01)
def wait_cache_listening(self):
pid = os.getpid()
dbname = Transaction().database.name
listener = MemoryCache._listener.get((pid, dbname))
while (not getattr(listener, 'listening', False)
and listener.is_alive()):
time.sleep(.01)
@unittest.skip("No cache sync on transaction start with channel")
def test_memory_cache_sync(self):
super().test_memory_cache_sync()
class LRUDictTestCase(unittest.TestCase):
"Test LRUDict"
def test_setitem(self):
lru_dict = LRUDict(1)
lru_dict['foo'] = 'foo'
self.assertEqual(len(lru_dict), 1)
lru_dict['bar'] = 'bar'
self.assertEqual(len(lru_dict), 1)
self.assertEqual(lru_dict, {'bar': 'bar'})
def test_update(self):
lru_dict = LRUDict(1)
lru_dict['foo'] = 'foo'
self.assertEqual(len(lru_dict), 1)
lru_dict.update(bar='bar')
lru_dict.update(baz='baz')
self.assertEqual(len(lru_dict), 1)
self.assertEqual(lru_dict, {'baz': 'baz'})
def test_setdefault(self):
lru_dict = LRUDict(1)
lru_dict['foo'] = 'foo'
self.assertEqual(len(lru_dict), 1)
lru_dict.setdefault('bar', 'value')
self.assertEqual(len(lru_dict), 1)
self.assertEqual(lru_dict, {'bar': 'value'})
def test_default_factory(self):
lru_dict = LRUDict(1, default_factory=list)
self.assertEqual(lru_dict['foo'], [])
lru_dict['bar'].append('bar')
self.assertEqual(lru_dict, {'bar': ['bar']})
def test_default_factory_with_key(self):
lru_dict = LRUDict(
1, default_factory=lambda k: k, default_factory_with_key=True)
self.assertEqual(lru_dict['foo'], 'foo')
class LRUDictTransactionTestCase(unittest.TestCase):
"Test LRUDictTransaction"
@classmethod
def setUpClass(cls):
activate_module('tests')
@with_transaction()
def test_init(self):
"Test init set to transaction counter"
lru_dict = LRUDictTransaction(48)
self.assertEqual(lru_dict.counter, Transaction().counter)
@with_transaction()
def test_clear(self):
"Test clear reset counter"
lru_dict = LRUDictTransaction(48)
Transaction().counter += 1
lru_dict.clear()
self.assertEqual(lru_dict.counter, Transaction().counter)
@with_transaction()
def test_refresh(self):
"Test refresh"
lru_dict = LRUDictTransaction(48)
lru_dict['foo'] = 'foo'
lru_dict.refresh()
self.assertEqual(lru_dict, {'foo': 'foo'})
Transaction().counter += 1
lru_dict.refresh()
self.assertEqual(lru_dict, {})
self.assertEqual(lru_dict.counter, Transaction().counter)