Files
Implementation_ITSA/Reference Data/python_project/helpers/tryton_helpers.py
AzureAD\SylvainDUVERNAY 4bac53b01c Add scripts for importing prices and sale contract fees; update purchase fees script
- Implemented `import_prices.py` to import price index values from a CSV file with migration mapping.
- Created `import_sale_fees.py` for importing sale contract line fees, including detailed logging and error handling.
- Modified `import_purchase_fees.py` to change fee type from 'ordered' to 'budgeted' and added fee ID logging.
2026-03-24 14:13:23 +01:00

1165 lines
40 KiB
Python

"""
Shared helper functions for Tryton import scripts
"""
import csv
import psycopg2
from decimal import Decimal
from datetime import datetime
from proteus import config, Model
# ============================================================ Listing Functions ============================================================
def list_available_currencies():
"""List all available currencies for debugging"""
Currency = Model.get('currency.currency')
currencies = Currency.find([])
print("\n" + "="*70)
print("AVAILABLE CURRENCIES")
print("="*70)
print(f"{'Name':<10} {'Code':<10} {'Symbol':<10}")
print("-"*70)
for curr in currencies:
name = curr.name if hasattr(curr, 'name') else 'N/A'
code = str(curr.code) if hasattr(curr, 'code') else 'N/A'
symbol = curr.symbol if hasattr(curr, 'symbol') else 'N/A'
print(f"{name:<10} {code:<10} {symbol:<10}")
print("="*70 + "\n")
return currencies
def list_available_payment_terms():
"""List all available payment terms for debugging"""
PaymentTerm = Model.get('account.invoice.payment_term')
terms = PaymentTerm.find([])
print("="*70)
print("AVAILABLE PAYMENT TERMS")
print("="*70)
if terms:
print(f"{'Name':<50}")
print("-"*70)
for term in terms:
name = term.name if hasattr(term, 'name') else 'N/A'
print(f"{name:<50}")
else:
print("No payment terms found")
print("="*70 + "\n")
return terms
# ============================================================ Model Lookup Functions ============================================================
#===================================== General Lookup Functions =====================================
def find_party_by_name(party_name):
"""
Find a party (typically a supplier, client, service provider...) by its name.
Attempts an exact name match first. If no exact match is found, performs a
case-insensitive 'ilike' lookup. If still not found, a warning is printed
and the function returns None.
Args:
party_name (str): The name of the party to search for. Leading/trailing
whitespace is ignored and empty values return None.
Returns:
object or None: The first matching Tryton `party.party` record if found,
otherwise `None` when no match exists or the input is invalid.
Notes:
- If multiple matches exist the first record returned by Tryton is used.
- This function prints a warning when no party is found.
"""
if not party_name or party_name.strip() == '':
return None
Party = Model.get('party.party')
# Try exact name match
parties = Party.find([('name', '=', party_name)])
if parties:
return parties[0]
# Try case-insensitive match
parties = Party.find([('name', 'ilike', party_name)])
if parties:
return parties[0]
print(f" ⚠ Warning: Party '{party_name}' not found")
return None
def get_party_invoice_address(party):
"""Get invoice address from party"""
if not party:
return None
# Try to get addresses
if hasattr(party, 'addresses') and party.addresses:
# Look for an invoice address first
for address in party.addresses:
if hasattr(address, 'invoice') and address.invoice:
return address
# If no invoice address, return first address
return party.addresses[0]
return None
def find_product_by_code(product_code):
"""
Find a product by its code.
Attempts an exact code match first. If no exact match is found, performs a
case-insensitive 'ilike' lookup. If still not found, a warning is printed
and the function returns None.
Args:
product_code (str): The code of the product to search for. Leading and
trailing whitespace is ignored and empty values return None.
Returns:
object or None: The first matching Tryton `product.product` record if
found; otherwise `None` when no match exists or the input is invalid.
Notes:
- If multiple matches exist the first record returned by Tryton is used.
- This function prints a warning when no product is found.
"""
if not product_code or product_code.strip() == '':
return None
Product = Model.get('product.product')
# Try exact match first
products = Product.find([('code', '=', product_code)])
if products:
return products[0]
# Try case-insensitive match as first fallback
products = Product.find([('code', 'ilike', product_code)])
if products:
return products[0]
# Try by template code as second fallback
Template = Model.get('product.template')
templates = Template.find([('code', '=', product_code)])
if templates and templates[0].products:
print(f" Found product '{product_code}' via template")
return templates[0].products[0]
print(f" ⚠ Warning: Product '{product_code}' not found")
return None
def find_currency_by_code(currency_code):
"""
Find a currency by its ISO code or name.
Performs a case-sensitive search by currency name after uppercasing the
input. Returns the first matching `currency.currency` record. If the input
is empty or no match is found, a warning is printed and the function
returns `None`.
Args:
currency_code (str): The currency code or name to search for. Leading
and trailing whitespace is ignored; empty values return `None`.
Returns:
object or None: The first matching Tryton `currency.currency` record
if found; otherwise `None` when no match exists or the input is
invalid.
Notes:
- This function searches by the `name` field using an uppercased exact
match. Consider expanding to include ISO code or `ilike` searches if
needed in the future.
- If multiple matches exist the first record returned by Tryton is used.
"""
if not currency_code or currency_code.strip() == '':
return None
Currency = Model.get('currency.currency')
# Search by currency name
currencies = Currency.find([('name', '=', currency_code.upper())])
if currencies:
return currencies[0]
print(f" ⚠ Warning: Currency '{currency_code}' not found")
return None
def find_uom_by_code(uom_code):
"""Find unit of measure by code or symbol"""
Uom = Model.get('product.uom')
if not uom_code:
return None
# Try by symbol first (most common: 'kg', 'unit', etc.)
uoms = Uom.find([('symbol', '=', uom_code)])
if uoms:
return uoms[0]
# Try by name
uoms = Uom.find([('name', '=', uom_code)])
if uoms:
print(f" Found UOM '{uom_code}' by name instead of symbol")
return uoms[0]
# Try case-insensitive
all_uoms = Uom.find([])
for uom in all_uoms:
if (hasattr(uom, 'symbol') and uom.symbol and uom.symbol.upper() == uom_code.upper()):
print(f" Found UOM '{uom_code}' (case-insensitive)")
return uom
print(f" ⚠ Warning: UOM '{uom_code}' not found")
return None
def find_location(location):
"""Find stock location by code or name"""
Location = Model.get('stock.location')
if not location:
return None
# Try by code first
locations = Location.find([('code', '=', location)])
if locations:
return locations[0]
# Try by name as fallback
locations = Location.find([('name', '=', location)])
if locations:
print(f" Found location: {location} - ID: {locations[0].id}")
return locations[0]
print(f" ⚠ Warning: Location with code '{location}' not found")
return None
def find_warehouse(warehouse):
"""Find warehouse by code or name"""
Location = Model.get('stock.location')
if not warehouse:
return None
# Try by code first
warehouses = Location.find([
('code', '=', warehouse),
('type', '=', 'warehouse')
])
if warehouses:
return warehouses[0]
# Try by name as fallback
warehouses = Location.find([
('name', '=', warehouse),
('type', '=', 'warehouse')
])
if warehouses:
print(f" Found warehouse '{warehouse}' by name instead of code")
return warehouses[0]
print(f" ⚠ Warning: Warehouse with code '{warehouse}' not found")
return None
def find_payment_term_by_name(payment_term_name):
"""Find payment term by name"""
PaymentTerm = Model.get('account.invoice.payment_term')
if not payment_term_name:
return None
terms = PaymentTerm.find([('name', '=', payment_term_name)])
if terms:
return terms[0]
# Try case-insensitive
all_terms = PaymentTerm.find([])
for term in all_terms:
if term.name.upper() == payment_term_name.upper():
print(f" Found payment term '{payment_term_name}' (case-insensitive)")
return term
print(f" ⚠ Warning: Payment term '{payment_term_name}' not found")
return None
def find_incoterm_by_code(incoterm_code, incoterm_version='2025'):
"""
Find an Incoterm record by code and optional version.
Attempts to find an `incoterm.incoterm` record matching the provided
`incoterm_code` and `incoterm_version`. If an exact version match is not
found, the function falls back to common historical versions ('2020',
'2010') in that order. `incoterm_version` may be supplied as an int or
a string; integers will be converted to strings for comparison.
Args:
incoterm_code (str): The Incoterm code to search for (e.g., 'FOB').
incoterm_version (str|int): The preferred Incoterm version (default
'2025'). If an int is supplied it will be cast to str.
Returns:
object or None: The first matching `incoterm.incoterm` record if found;
otherwise `None` and a warning is printed.
Notes:
- The function prints a warning when no matching Incoterm is found.
- To adjust fallback behavior, update the list of versions checked.
"""
Incoterm = Model.get('incoterm.incoterm')
if not incoterm_code:
return None
# Ensure version is a string
if isinstance(incoterm_version, int):
incoterm_version = str(incoterm_version)
# Try to find incoterm based on specified version
incoterms = Incoterm.find([('code', '=', incoterm_code, ), ('version', '=', incoterm_version)])
if incoterms:
return incoterms[0]
# Try by incoterm 2020 as first fallback
incoterms = Incoterm.find([('code', '=', incoterm_code, ), ('version', '=', '2020')])
if incoterms:
return incoterms[0]
# Try by incoterm 2010 as second fallback
incoterms = Incoterm.find([('code', '=', incoterm_code, ), ('version', '=', '2010')])
if incoterms:
return incoterms[0]
print(f" ⚠ Warning: Incoterm with code '{incoterm_code}' not found")
return None
def find_weight_basis_by_name(weight_basis_abbr):
"""Find weight basis by abbreviation"""
WeightBasis = Model.get('purchase.weight.basis')
if not weight_basis_abbr:
return None
# Try by abbreviation first
WeightBasisCollection = WeightBasis.find([('name', '=', weight_basis_abbr)])
if WeightBasisCollection:
return WeightBasisCollection[0]
print(f" ⚠ Warning: Weight Basis with abbreviation '{weight_basis_abbr}' not found")
return None
def find_contract_line_by_sequence(contract, line_sequence):
"""
Find a contract line within a purchase contract by its sequence number.
Validates the provided contract and attempts to convert the provided
`line_sequence` to an integer. Searches the `lines` iterable on the
contract for a line object whose `sequence` attribute matches the
integer sequence number. Returns the first matching line, or `None` if the
contract is invalid, the sequence cannot be parsed, or no matching line is
found.
Args:
contract (object): A `purchase.purchase` record (or similar) expected
to have a `lines` iterable attribute containing line objects.
line_sequence (int | str): Sequence number to search for. Can be an
integer or a string representation of an integer.
Returns:
object or None: The matching contract line object if found; otherwise
`None`.
Notes:
- Prints a warning when the provided `line_sequence` is invalid or when
no matching line is found.
"""
if not contract or not contract.lines:
return None
try:
sequence_num = int(line_sequence)
except (ValueError, TypeError):
print(f" ⚠ Warning: Invalid line sequence '{line_sequence}'")
return None
# Search through contract lines for matching sequence
for line in contract.lines:
if hasattr(line, 'sequence') and line.sequence == sequence_num:
return line
print(f" ⚠ Warning: Contract line with sequence {sequence_num} not found")
return None
#===================================== Purchase Contract Lookup Functions =====================================
def find_purchase_contract_by_ref(contract_ref):
"""
Find a purchase contract by its reference identifier.
Performs an exact match lookup on the `reference` field of the
`purchase.purchase` model. If the input is empty or no contract is found,
the function prints a warning and returns `None`.
Args:
contract_ref (str): The reference string of the purchase contract.
Leading/trailing whitespace is ignored and empty values return None.
Returns:
object or None: The first matching Tryton `purchase.purchase` record if
found; otherwise `None` when no match exists or the input is invalid.
Notes:
- If multiple matches exist the first record returned by Tryton is used.
- This function prints a warning when no contract is found.
"""
if not contract_ref or contract_ref.strip() == '':
return None
Purchase = Model.get('purchase.purchase')
# Search by reference field
contracts = Purchase.find([('reference', '=', contract_ref)])
if contracts:
return contracts[0]
print(f" ⚠ Warning: Purchase contract with reference '{contract_ref}' not found")
return None
def find_purchase_contract_by_number(contract_number):
"""
Find a purchase contract by its number
Performs an exact match lookup on the `number` of the
`purchase.purchase` model. If the input is empty or no contract is found,
the function prints a warning and returns `None`.
Args:
contract_number (str): The number string of the purchase contract.
Leading/trailing whitespace is ignored and empty values return None.
Returns:
object or None: The first matching Tryton `purchase.purchase` record if
found; otherwise `None` when no match exists or the input is invalid.
Notes:
- If multiple matches exist the first record returned by Tryton is used.
- This function prints a warning when no contract is found.
"""
if not contract_number or contract_number.strip() == '':
print(f" ⚠ Warning: Contract number is empty")
return None
Purchase = Model.get('purchase.purchase')
# Search by number
contracts = Purchase.find([
('number', '=', contract_number.strip())
])
if contracts:
return contracts[0]
print(f" ⚠ Warning: Purchase contract with number '{contract_number}' not found")
return None
#===================================== Sale Contract Lookup Functions =====================================
def find_sale_contract_by_number(contract_number):
"""
Find a sale contract by its number
Performs an exact match lookup on the `number` of the
`sale.sale` model. If the input is empty or no contract is found,
the function prints a warning and returns `None`.
Args:
contract_number (str): The number string of the sale contract.
Leading/trailing whitespace is ignored and empty values return None.
Returns:
object or None: The first matching Tryton `sale.sale` record if
found; otherwise `None` when no match exists or the input is invalid.
Notes:
- If multiple matches exist the first record returned by Tryton is used.
- This function prints a warning when no contract is found.
"""
if not contract_number or contract_number.strip() == '':
print(f" ⚠ Warning: Contract number is empty")
return None
Sale = Model.get('sale.sale')
# Search by number
contracts = Sale.find([
('number', '=', contract_number.strip())
])
# If not found, try by adding "S-" prefix (if not already present)
if not contracts and not contract_number.strip().startswith('S-'):
prefixed_number = f"S-{contract_number.strip()}"
contracts = Sale.find([
('number', '=', prefixed_number)
])
if contracts:
return contracts[0]
print(f" ⚠ Warning: Sale contract with number '{contract_number}' not found")
return None
#===================================== Supplier Functions =====================================
def find_supplier_category():
"""
Retrieve the 'SUPPLIER' party category from the system.
First attempts an exact match on the `name` field for 'SUPPLIER'. If an
exact match is not found, the function falls back to iterating all party
categories and returns the first one whose uppercased `name` equals
'SUPPLIER'. If no matching category is found, a warning is printed and
`None` is returned.
Args:
None
Returns:
object or None: The matching `party.category` record if found; otherwise
`None`.
Notes:
- This helper helps ensure that parties can be categorized as suppliers
without relying on exact case-sensitive persistence of the category
name.
"""
Category = Model.get('party.category')
# Try to find existing SUPPLIER category
categories = Category.find([('name', '=', 'SUPPLIER')])
if categories:
return categories[0]
# Try case-insensitive
all_categories = Category.find([])
for cat in all_categories:
if cat.name.upper() == 'SUPPLIER':
return cat
print(f" ⚠ Warning: SUPPLIER category not found")
return None
def ensure_party_is_supplier(party, auto_enable=True):
"""
Ensure a party has the SUPPLIER category, optionally adding it.
Checks whether the provided `party` record contains the SUPPLIER
category. If the category is missing and `auto_enable` is True, the
function attempts to append the category to the party and save the
record. On success it returns the updated party and True. If
`auto_enable` is False the function leaves the party unchanged and
returns (party, False), printing guidance for manual action.
Args:
party (object): A `party.party` record expected to have a
`categories` collection attribute.
auto_enable (bool): If True (default) attempt to add the SUPPLIER
category when missing; if False do not modify the party and
prompt the user to add the category manually.
Returns:
tuple: (party, bool) where bool is True if the party has the
SUPPLIER category after the call, otherwise False.
Notes:
- Prints informative messages for missing category, permission
issues, and other exceptions. Use `find_supplier_category()` to
retrieve the category object directly if needed.
"""
if not party:
return party, False
# Get the SUPPLIER category
supplier_category = find_supplier_category()
if not supplier_category:
print(f" ✗ Cannot find SUPPLIER category in the system")
return party, False
# Check if party already has SUPPLIER category
has_supplier_category = False
if hasattr(party, 'categories') and party.categories:
for category in party.categories:
if category.id == supplier_category.id:
has_supplier_category = True
break
if has_supplier_category:
return party, True
if not auto_enable:
print(f" ⚠ Party '{party.rec_name}' does not have SUPPLIER category")
print(f" Please manually add SUPPLIER category to this party in Tryton UI")
return party, False
# Try to add SUPPLIER category
print(f" Party '{party.rec_name}' does not have SUPPLIER category - adding it")
try:
# Just append to the original party's categories
party.categories.append(supplier_category)
party.save()
print(f" ✓ SUPPLIER category added to party")
return party, True
except Exception as e:
print(f" ✗ Could not add SUPPLIER category: {e}")
print(f" Error type: {type(e).__name__}")
import traceback
traceback.print_exc()
if 'permission' in str(e).lower() or 'access' in str(e).lower():
print(f" This appears to be a permission issue.")
print(f" Please manually add SUPPLIER category to '{party.rec_name}' in Tryton UI")
return party, False
#===================================== Client Functions =====================================
def find_client_category():
"""
Retrieve the 'CLIENT' party category from the system.
Attempts an exact match on the `name` field for 'CLIENT'. If no exact
match is found, the function falls back to iterating all party categories
and returns the first one whose uppercased `name` equals 'CLIENT'. If no
matching category is found, a warning is printed and `None` is returned.
Args:
None
Returns:
object or None: The matching `party.category` record if found; otherwise
`None` when no CLIENT category exists.
Notes:
- This mirrors `find_supplier_category()` but targets the CLIENT type.
"""
Category = Model.get('party.category')
# Try to find existing CLIENT category
categories = Category.find([('name', '=', 'CLIENT')])
if categories:
return categories[0]
# Try case-insensitive
all_categories = Category.find([])
for cat in all_categories:
if cat.name.upper() == 'CLIENT':
return cat
print(f" ⚠ Warning: CLIENT category not found")
return None
def ensure_party_is_client(party, auto_enable=True):
"""
Ensure a party has the CLIENT category, optionally adding it.
Verifies whether the provided `party` record has the CLIENT category.
If missing and `auto_enable` is True, attempts to append the category and
save the party record. Returns the (possibly modified) party and a boolean
indicating whether the party has the CLIENT category after the call.
Args:
party (object): A `party.party` record expected to have a
`categories` collection attribute.
auto_enable (bool): If True (default) attempt to add the CLIENT
category when missing; if False do not modify the party and
prompt the user to add the category manually.
Returns:
tuple: (party, bool) where bool is True if the party has the
CLIENT category after the call, otherwise False.
Notes:
- Prints informative messages for missing category, permission
issues, and other exceptions. Use `find_client_category()` to
retrieve the category object directly if needed.
"""
if not party:
return party, False
# Get the CLIENT category
client_category = find_client_category()
if not client_category:
print(f" ✗ Cannot find CLIENT category in the system")
return party, False
# Check if party already has CLIENT category
has_client_category = False
if hasattr(party, 'categories') and party.categories:
for category in party.categories:
if category.id == client_category.id:
has_client_category = True
break
if has_client_category:
return party, True
if not auto_enable:
print(f" ⚠ Party '{party.rec_name}' does not have CLIENT category")
print(f" Please manually add CLIENT category to this party in Tryton UI")
return party, False
# Try to add CLIENT category
print(f" Party '{party.rec_name}' does not have CLIENT category - adding it")
try:
# Just append to the original party's categories
party.categories.append(client_category)
party.save()
print(f" ✓ CLIENT category added to party")
return party, True
except Exception as e:
print(f" ✗ Could not add CLIENT category: {e}")
print(f" Error type: {type(e).__name__}")
import traceback
traceback.print_exc()
if 'permission' in str(e).lower() or 'access' in str(e).lower():
print(f" This appears to be a permission issue.")
print(f" Please manually add CLIENT category to '{party.rec_name}' in Tryton UI")
return party, False
#===================================== Analytic Dimension Functions =====================================
def find_or_create_analytic_dimension_value(dimension_name, value_name):
"""
Find or create an analytic dimension value for a given dimension.
Args:
dimension_name: Name of the analytic dimension (e.g., 'Book', 'Strategy')
value_name: Name of the value to find or create (e.g., 'Thailand FY25')
Returns:
The analytic dimension value record or None if dimension doesn't exist
"""
AnalyticDimension = Model.get('analytic.dimension')
AnalyticDimensionValue = Model.get('analytic.dimension.value')
# Find the dimension
dimensions = AnalyticDimension.find([('name', '=', dimension_name)])
if not dimensions:
print(f" ⚠ Analytic dimension '{dimension_name}' not found in system")
return None
dimension = dimensions[0]
# Try to find existing value
existing_values = AnalyticDimensionValue.find([
('dimension', '=', dimension.id),
('name', '=', value_name)
])
if existing_values:
print(f" ✓ Found existing value '{value_name}' for dimension '{dimension_name}'")
return existing_values[0]
# Create new value if it doesn't exist
try:
new_value = AnalyticDimensionValue()
new_value.dimension = dimension
new_value.code = value_name
new_value.name = value_name
new_value.active = True
new_value.save()
print(f" ✓ Created new value '{value_name}' for dimension '{dimension_name}' (ID: {new_value.id})")
return new_value
except Exception as e:
print(f" ✗ Error creating analytic dimension value '{value_name}': {e}")
return None
def link_analytic_dimensions_to_purchase(purchase, dimension_values_dict):
"""
Link analytic dimension values to a purchase contract using Proteus.
Args:
purchase: The purchase record (Proteus object)
dimension_values_dict: Dict with dimension names as keys and value records as values
e.g., {'Book': book_value_record, 'Strategy': strategy_value_record}
Returns:
True if successful, False otherwise
"""
if not dimension_values_dict:
print(f" No analytic dimensions to link")
return True
try:
AnalyticDimensionAssignment = Model.get('analytic.dimension.assignment')
# Remove existing assignments for this purchase to avoid duplicates
existing_assignments = AnalyticDimensionAssignment.find([
('purchase', '=', purchase.id)
])
if existing_assignments:
print(f" Removing {len(existing_assignments)} existing analytic dimension assignment(s)")
for assignment in existing_assignments:
assignment.delete()
# Create new assignments
success_count = 0
for dimension_name, value_record in dimension_values_dict.items():
if value_record is None:
continue
try:
assignment = AnalyticDimensionAssignment()
assignment.purchase = purchase
# Your custom module requires BOTH fields:
assignment.dimension = value_record.dimension # Set the dimension reference
assignment.value = value_record # Set the value reference
assignment.save()
success_count += 1
print(f" ✓ Linked {dimension_name}='{value_record.name}' to purchase {purchase.id}")
except Exception as e:
print(f" ✗ Error linking {dimension_name}: {e}")
import traceback
traceback.print_exc()
if success_count > 0:
print(f" ✓ Successfully linked {success_count} analytic dimension(s)")
return True
else:
print(f" ⚠ No analytic dimensions were linked")
return False
except Exception as e:
print(f" ⚠ Error linking analytic dimensions: {e}")
import traceback
traceback.print_exc()
return False
def link_analytic_dimensions_to_sale(sale, dimension_values_dict):
"""
Link analytic dimension values to a sale contract using Proteus.
Args:
sale: The sale record (Proteus object)
dimension_values_dict: Dict with dimension names as keys and value records as values
e.g., {'Book': book_value_record, 'Strategy': strategy_value_record}
Returns:
True if successful, False otherwise
"""
if not dimension_values_dict:
print(f" No analytic dimensions to link")
return True
try:
AnalyticDimensionAssignment = Model.get('analytic.dimension.assignment')
# Remove existing assignments for this purchase to avoid duplicates
existing_assignments = AnalyticDimensionAssignment.find([
('sale', '=', sale.id)
])
if existing_assignments:
print(f" Removing {len(existing_assignments)} existing analytic dimension assignment(s)")
for assignment in existing_assignments:
assignment.delete()
# Create new assignments
success_count = 0
for dimension_name, value_record in dimension_values_dict.items():
if value_record is None:
continue
try:
assignment = AnalyticDimensionAssignment()
assignment.sale = sale
# Your custom module requires BOTH fields:
assignment.dimension = value_record.dimension # Set the dimension reference
assignment.value = value_record # Set the value reference
assignment.save()
success_count += 1
print(f" ✓ Linked {dimension_name}='{value_record.name}' to sale {sale.id}")
except Exception as e:
print(f" ✗ Error linking {dimension_name}: {e}")
import traceback
traceback.print_exc()
if success_count > 0:
print(f" ✓ Successfully linked {success_count} analytic dimension(s)")
return True
else:
print(f" ⚠ No analytic dimensions were linked")
return False
except Exception as e:
print(f" ⚠ Error linking analytic dimensions: {e}")
import traceback
traceback.print_exc()
return False
#===================================== Fees Functions =====================================
def find_fee_mode_by_name(mode_name):
"""
Map a human-readable fee mode name to the system's internal mode code.
Normalizes the input (trims whitespace and uppercases) and returns a
short code used internally. Known mappings are:
- 'PER QT' -> 'perqt'
- '% COST PRICE' -> 'pcost'
- '% PRICE' -> 'pprice'
- 'LUMP SUM' -> 'lumpsum'
Args:
mode_name (str): Fee mode display name. Leading/trailing whitespace is
ignored and comparison is case-insensitive.
Returns:
str or None: The mapped internal mode string if recognized, otherwise
`None` for unknown or empty inputs.
Notes:
- Prints a warning when an unknown mode is encountered.
"""
if not mode_name or mode_name.strip() == '':
return None
mode_name = mode_name.strip().upper()
if mode_name == 'PER QT':
return 'perqt'
elif mode_name == '% COST PRICE':
return 'pcost'
elif mode_name == '% PRICE':
return 'pprice'
elif mode_name == 'LUMP SUM':
return 'lumpsum'
else:
print(f" ⚠ Warning: Unknown Fee mode: '{mode_name}'")
return None
def find_payable_receivable_by_name(p_r_value):
"""
Determine whether a fee is payable or receivable from a P_R-style value.
Normalizes the input by trimming whitespace and uppercasing it, then maps
common variants to either 'pay' or 'rec'. Recognised payable values include
'PAY', 'PAYABLE', and 'P'; recognised receivable values include 'REC',
'RECEIVABLE', and 'R'. An empty or falsy input returns `None`. Unknown
values print a warning and default to 'pay'.
Args:
p_r_value (str): Raw value from the P_R column (e.g., 'PAY', 'REC').
Returns:
str or None: 'pay' for payable, 'rec' for receivable, or `None` for
empty/invalid inputs.
Notes:
- Prints a warning when encountering an unrecognised value and
defaults to 'pay' to maintain backward compatibility.
"""
if not p_r_value:
return None
p_r_upper = p_r_value.strip().upper()
if p_r_upper in ['PAY', 'PAYABLE', 'P']:
return 'pay'
elif p_r_upper in ['REC', 'RECEIVABLE', 'R']:
return 'rec'
else:
print(f" ⚠ Warning: Unknown payable/receivable value '{p_r_value}' - defaulting to 'PAY'")
return 'pay'
def get_existing_fees_for_line(contract_line):
"""
Retrieve the existing fees associated with a contract line.
Validates the provided `contract_line` and returns its `fees` collection
if present. If the contract line is missing or does not expose a `fees`
attribute, the function returns an empty list to simplify downstream
duplicate checks and iteration.
Args:
contract_line (object): A contract line object expected to have a
`fees` iterable attribute (may be None or empty).
Returns:
list: The fees associated with the contract line, or an empty list if
none exist or the input is invalid.
"""
if not contract_line or not hasattr(contract_line, 'fees'):
return []
return contract_line.fees if contract_line.fees else []
def fee_already_exists(existing_fees, product, supplier, price):
"""
Check whether a fee with the same product, supplier and price already exists.
Iterates `existing_fees` and compares each fee's `product.id`, `party.id`,
and `price` to the provided `product`, `supplier`, and `price` respectively.
The function performs attribute presence checks to avoid AttributeError and
uses exact equality for price comparison.
Args:
existing_fees (iterable): Iterable of fee objects (may be a list or
None). Each fee is expected to expose `product`, `party`, and
`price` attributes.
product (object): Product record with an `id` attribute.
supplier (object): Supplier/party record with an `id` attribute.
price (Decimal | number): Price value to match against fee.price.
Returns:
bool: True if a matching fee exists; False otherwise.
Notes:
- Exact equality is used for price comparison; consider tolerances when
comparing floating point values.
"""
if not existing_fees:
return False
for fee in existing_fees:
if (hasattr(fee, 'product') and fee.product and fee.product.id == product.id and
hasattr(fee, 'party') and fee.party and fee.party.id == supplier.id and
hasattr(fee, 'price') and fee.price == price):
return True
return False
# ============================================================ Field Parsing Functions ============================================================
def parse_decimal(value, field_name):
"""
Parse and validate a numeric value into a Decimal.
Converts `value` to a Decimal using `Decimal(str(value))`. Returns `None`
for empty inputs, common string null markers (e.g. 'NULL', 'NONE', 'N/A'),
or when the value cannot be parsed as a decimal (in which case a warning
is printed referencing `field_name`).
Args:
value (str|int|Decimal|None): The raw value to parse into a Decimal.
field_name (str): Name of the field (used to provide contextual
information in warning messages).
Returns:
Decimal or None: A Decimal instance when parsing succeeds; otherwise
`None` for empty/invalid inputs.
Notes:
- Uses `Decimal(str(value))` to avoid floating-point precision issues.
- Catching `ValueError` and `TypeError` ensures the function is safe to
call on arbitrary input values encountered while importing data.
"""
if not value or value == '':
return None
# Handle 'NULL' or similar string values
if isinstance(value, str) and value.strip().upper() in ['NULL', 'NONE', 'N/A', '']:
return None
try:
return Decimal(str(value))
except (ValueError, TypeError) as e:
print(f" ⚠ Warning: Invalid {field_name} value '{value}' - {e}")
return None
def parse_date(date_str):
"""
Parse a date string into a date object using common formats.
The function trims whitespace and treats common null markers
(e.g. 'NULL', 'NONE', 'N/A', '') as missing values returning `None`.
It then attempts to parse the input using a set of common date formats
and returns a `datetime.date` object for the first matching format.
If no format matches, a warning is printed and `None` is returned.
Args:
date_str (str): Date string to parse. Accepted example formats:
- '%Y-%m-%d' (e.g. '2024-01-15')
- '%d/%m/%Y' (e.g. '15/01/2024')
- '%m/%d/%Y' (e.g. '01/15/2024')
- '%d-%m-%Y' (e.g. '15-01-2024')
- '%Y/%m/%d' (e.g. '2024/01/15')
Returns:
datetime.date or None: Parsed date, or `None` if the input is empty,
represents a null marker, or cannot be parsed.
Notes:
- The order of formats matters for ambiguous dates (e.g. '01/02/2024').
- Extend `date_formats` if additional formats are required.
"""
if not date_str or date_str.strip() == '':
return None
date_str = date_str.strip()
# Handle 'NULL' or similar string values
if date_str.upper() in ['NULL', 'NONE', 'N/A', '']:
return None
# Try different date formats
date_formats = [
'%Y-%m-%d', # 2024-01-15
'%d/%m/%Y', # 15/01/2024
'%m/%d/%Y', # 01/15/2024
'%d-%m-%Y', # 15-01-2024
'%Y/%m/%d', # 2024/01/15
]
for fmt in date_formats:
try:
return datetime.strptime(date_str, fmt).date()
except ValueError:
continue
print(f" ⚠ Warning: Could not parse date '{date_str}'")
return None