Files
tradon/modules/account/period.py
2025-12-26 13:11:43 +00:00

418 lines
16 KiB
Python
Executable File

# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
from trytond.cache import Cache
from trytond.const import OPERATORS
from trytond.i18n import gettext
from trytond.model import Index, ModelSQL, ModelView, Workflow, fields
from trytond.model.exceptions import AccessError
from trytond.pool import Pool
from trytond.pyson import Eval, Id
from trytond.tools import grouped_slice
from trytond.transaction import Transaction
from .exceptions import (
ClosePeriodError, PeriodDatesError, PeriodNotFoundError,
PeriodSequenceError)
_STATES = {
'readonly': Eval('state') != 'open',
}
class Period(Workflow, ModelSQL, ModelView):
'Period'
__name__ = 'account.period'
name = fields.Char('Name', required=True)
start_date = fields.Date('Starting Date', required=True, states=_STATES,
domain=[('start_date', '<=', Eval('end_date', None))])
end_date = fields.Date('Ending Date', required=True, states=_STATES,
domain=[('end_date', '>=', Eval('start_date', None))])
fiscalyear = fields.Many2One(
'account.fiscalyear', "Fiscal Year", required=True, states=_STATES)
state = fields.Selection([
('open', 'Open'),
('closed', 'Closed'),
('locked', 'Locked'),
], 'State', readonly=True, required=True, sort=False)
post_move_sequence = fields.Many2One('ir.sequence', 'Post Move Sequence',
domain=[
('sequence_type', '=',
Id('account', 'sequence_type_account_move')),
['OR',
('company', '=', None),
('company', '=', Eval('company', -1)),
],
])
type = fields.Selection([
('standard', 'Standard'),
('adjustment', 'Adjustment'),
], 'Type', required=True,
states=_STATES)
company = fields.Function(fields.Many2One('company.company', 'Company',),
'on_change_with_company', searcher='search_company')
icon = fields.Function(fields.Char("Icon"), 'get_icon')
_find_cache = Cache(__name__ + '.find', context=False)
@classmethod
def __setup__(cls):
super(Period, cls).__setup__()
t = cls.__table__()
cls.__access__.add('fiscalyear')
cls._sql_indexes.add(
Index(
t,
(t.start_date, Index.Range()),
(t.end_date, Index.Range()),
order='DESC'))
cls._order.insert(0, ('start_date', 'DESC'))
cls._transitions |= set((
('open', 'closed'),
('closed', 'locked'),
('closed', 'open'),
))
cls._buttons.update({
'close': {
'invisible': Eval('state') != 'open',
'depends': ['state'],
},
'reopen': {
'invisible': Eval('state') != 'closed',
'depends': ['state'],
},
'lock_': {
'invisible': Eval('state') != 'closed',
'depends': ['state'],
},
})
@classmethod
def __register__(cls, module):
cursor = Transaction().connection.cursor()
t = cls.__table__()
super().__register__(module)
# Migration from 6.8: rename state close to closed
cursor.execute(
*t.update([t.state], ['closed'], where=t.state == 'close'))
@staticmethod
def default_state():
return 'open'
@staticmethod
def default_type():
return 'standard'
@fields.depends('fiscalyear', '_parent_fiscalyear.company')
def on_change_with_company(self, name=None):
return self.fiscalyear.company if self.fiscalyear else None
@classmethod
def search_company(cls, name, clause):
return [('fiscalyear.' + clause[0],) + tuple(clause[1:])]
def get_icon(self, name):
return {
'open': 'tryton-account-open',
'closed': 'tryton-account-close',
'locked': 'tryton-account-block',
}.get(self.state)
@classmethod
def validate_fields(cls, periods, field_names):
super().validate_fields(periods, field_names)
cls.check_dates(periods, field_names)
cls.check_fiscalyear_dates(periods, field_names)
cls.check_move_dates(periods, field_names)
cls.check_post_move_sequence(periods, field_names)
@classmethod
def check_dates(cls, periods, field_names=None):
if field_names and not (
field_names & {
'start_date', 'end_date', 'fiscalyear', 'type'}):
return
transaction = Transaction()
connection = transaction.connection
cls.lock()
table = cls.__table__()
cursor = connection.cursor()
for period in periods:
if period.type != 'standard':
continue
cursor.execute(*table.select(table.id,
where=(((table.start_date <= period.start_date)
& (table.end_date >= period.start_date))
| ((table.start_date <= period.end_date)
& (table.end_date >= period.end_date))
| ((table.start_date >= period.start_date)
& (table.end_date <= period.end_date)))
& (table.fiscalyear == period.fiscalyear.id)
& (table.type == 'standard')
& (table.id != period.id)))
period_id = cursor.fetchone()
if period_id:
overlapping_period = cls(period_id[0])
raise PeriodDatesError(
gettext('account.msg_period_overlap',
first=period.rec_name,
second=overlapping_period.rec_name))
@classmethod
def check_fiscalyear_dates(cls, periods, field_names=None):
if field_names and not (
field_names & {
'start_date', 'end_date', 'fiscalyear'}):
return
for period in periods:
fiscalyear = period.fiscalyear
if (period.start_date < fiscalyear.start_date
or period.end_date > fiscalyear.end_date):
raise PeriodDatesError(
gettext('account.msg_period_fiscalyear_dates',
period=period.rec_name,
fiscalyear=fiscalyear.rec_name))
@classmethod
def check_move_dates(cls, periods, field_names=None):
pool = Pool()
Move = pool.get('account.move')
Lang = pool.get('ir.lang')
if field_names and not (field_names & {'start_date', 'end_date'}):
return
lang = Lang.get()
for sub_periods in grouped_slice(periods):
domain = ['OR']
for period in sub_periods:
domain.append([
('period', '=', period.id),
['OR',
('date', '<', period.start_date),
('date', '>', period.end_date),
],
])
moves = Move.search(domain, limit=1)
if moves:
move, = moves
raise PeriodDatesError(
gettext('account.msg_period_move_dates',
period=move.period.rec_name,
move=move.rec_name,
move_date=lang.strftime(move.date)))
@classmethod
def check_post_move_sequence(cls, periods, field_names=None):
if field_names and not (
field_names & {'post_move_sequence', 'fiscalyear'}):
return
for period in periods:
if not period.post_move_sequence:
continue
periods = cls.search([
('post_move_sequence', '=', period.post_move_sequence.id),
('fiscalyear', '!=', period.fiscalyear.id),
])
if periods:
raise PeriodSequenceError(
gettext('account.msg_period_same_sequence',
first=period.rec_name,
second=periods[0].rec_name))
@classmethod
def find(cls, company, date=None, test_state=True):
'''
Return the period for the company at the date or the current date
or raise PeriodNotFoundError.
If test_state is true, it searches on non-closed periods
'''
pool = Pool()
Date = pool.get('ir.date')
Lang = pool.get('ir.lang')
Company = pool.get('company.company')
company_id = int(company) if company is not None else None
if not date:
with Transaction().set_context(company=company_id):
date = Date.today()
key = (company_id, date, test_state)
period = cls._find_cache.get(key, -1)
if period is not None and period < 0:
clause = [
('start_date', '<=', date),
('end_date', '>=', date),
('fiscalyear.company', '=', company_id),
('type', '=', 'standard'),
]
periods = cls.search(
clause, order=[('start_date', 'DESC')], limit=1)
if periods:
period, = periods
else:
period = None
cls._find_cache.set(key, int(period) if period else None)
elif period is not None:
period = cls(period)
found = period and (not test_state or period.state == 'open')
if not found:
lang = Lang.get()
if company is not None and not isinstance(company, Company):
company = Company(company)
if not period:
raise PeriodNotFoundError(
gettext('account.msg_no_period_date',
date=lang.strftime(date),
company=company.rec_name if company else ''))
else:
raise PeriodNotFoundError(
gettext('account.msg_no_open_period_date',
date=lang.strftime(date),
period=period.rec_name,
company=company.rec_name if company else ''))
else:
return period
@classmethod
def search(cls, args, offset=0, limit=None, order=None, count=False,
query=False):
args = args[:]
def process_args(args):
i = 0
while i < len(args):
# add test for xmlrpc and pyson that doesn't handle tuple
if ((
isinstance(args[i], tuple)
or (isinstance(args[i], list) and len(args[i]) > 2
and args[i][1] in OPERATORS))
and args[i][0] in ('start_date', 'end_date')
and isinstance(args[i][2], (list, tuple))):
if not args[i][2][0]:
args[i] = ('id', '!=', '0')
else:
period = cls(args[i][2][0])
args[i] = (args[i][0], args[i][1],
getattr(period, args[i][2][1]))
elif isinstance(args[i], list):
process_args(args[i])
i += 1
process_args(args)
return super(Period, cls).search(args, offset=offset, limit=limit,
order=order, count=count, query=query)
@classmethod
def create(cls, vlist):
FiscalYear = Pool().get('account.fiscalyear')
vlist = [x.copy() for x in vlist]
for vals in vlist:
if vals.get('fiscalyear'):
fiscalyear = FiscalYear(vals['fiscalyear'])
if fiscalyear.state != 'open':
raise AccessError(
gettext('account.msg_create_period_closed_fiscalyear',
fiscalyear=fiscalyear.rec_name))
if not vals.get('post_move_sequence'):
vals['post_move_sequence'] = (
fiscalyear.post_move_sequence.id)
periods = super(Period, cls).create(vlist)
cls._find_cache.clear()
return periods
@classmethod
def write(cls, *args):
Move = Pool().get('account.move')
actions = iter(args)
args = []
for periods, values in zip(actions, actions):
if values.get('state') == 'open':
for period in periods:
if period.fiscalyear.state != 'open':
raise AccessError(
gettext(
'account.msg_open_period_closed_fiscalyear',
period=period.rec_name,
fiscalyear=period.fiscalyear.rec_name))
if values.get('post_move_sequence'):
for period in periods:
if (period.post_move_sequence
and period.post_move_sequence.id
!= values['post_move_sequence']):
if Move.search([
('period', '=', period.id),
('state', '=', 'posted'),
]):
raise AccessError(
gettext('account'
'.msg_change_period_post_move_sequence',
period=period.rec_name))
args.extend((periods, values))
super(Period, cls).write(*args)
cls._find_cache.clear()
@classmethod
def delete(cls, periods):
super(Period, cls).delete(periods)
cls._find_cache.clear()
@classmethod
@ModelView.button
@Workflow.transition('closed')
def close(cls, periods):
pool = Pool()
JournalPeriod = pool.get('account.journal.period')
Move = pool.get('account.move')
Account = pool.get('account.account')
transaction = Transaction()
# Lock period and move to be sure no new record will be created
JournalPeriod.lock()
Move.lock()
for period in periods:
with transaction.set_context(
fiscalyear=period.fiscalyear.id, date=period.end_date,
cumulate=True, journal=None):
for account in Account.search([
('company', '=', period.company.id),
('end_date', '>=', period.start_date),
('end_date', '<=', period.end_date),
]):
if account.balance:
raise ClosePeriodError(
gettext('account.'
'msg_close_period_inactive_accounts',
account=account.rec_name,
period=period.rec_name))
unposted_moves = Move.search([
('period', 'in', [p.id for p in periods]),
('state', '!=', 'posted'),
], limit=1)
if unposted_moves:
unposted_move, = unposted_moves
raise ClosePeriodError(
gettext('account.msg_close_period_non_posted_moves',
period=unposted_move.period.rec_name,
moves=unposted_move.rec_name))
journal_periods = JournalPeriod.search([
('period', 'in', [p.id for p in periods]),
])
JournalPeriod.close(journal_periods)
@classmethod
@ModelView.button
@Workflow.transition('open')
def reopen(cls, periods):
"Re-open period"
pass
@classmethod
@ModelView.button
@Workflow.transition('locked')
def lock_(cls, periods):
pass
@property
def post_move_sequence_used(self):
return self.post_move_sequence or self.fiscalyear.post_move_sequence