- Implemented `import_prices.py` to import price index values from a CSV file with migration mapping. - Created `import_sale_fees.py` for importing sale contract line fees, including detailed logging and error handling. - Modified `import_purchase_fees.py` to change fee type from 'ordered' to 'budgeted' and added fee ID logging.
365 lines
13 KiB
Python
365 lines
13 KiB
Python
import sys
|
|
from pathlib import Path
|
|
|
|
# Add parent directory to Python path so we can import helpers
|
|
parent_dir = Path(__file__).parent.parent
|
|
sys.path.insert(0, str(parent_dir))
|
|
|
|
import csv
|
|
from decimal import Decimal
|
|
from proteus import config, Model
|
|
|
|
from helpers.config import (
|
|
SALE_FEES_CSV,
|
|
connect_to_tryton)
|
|
|
|
from helpers.tryton_helpers import (
|
|
find_party_by_name,
|
|
find_product_by_code,
|
|
find_sale_contract_by_number,
|
|
find_contract_line_by_sequence,
|
|
find_currency_by_code,
|
|
parse_decimal,
|
|
ensure_party_is_supplier,
|
|
find_fee_mode_by_name,
|
|
find_payable_receivable_by_name,
|
|
get_existing_fees_for_line,
|
|
fee_already_exists)
|
|
|
|
|
|
# CSV Configuration
|
|
CSV_FILE_PATH = SALE_FEES_CSV
|
|
|
|
|
|
# Import options
|
|
AUTO_ENABLE_SUPPLIER = True # Set to False to skip auto-enabling supplier flag
|
|
SKIP_NON_SUPPLIERS = False # Set to True to skip parties that aren't suppliers
|
|
|
|
|
|
def import_sale_contract_fees(csv_file):
|
|
"""Import sale contract line fees from CSV"""
|
|
|
|
print(f"{'='*70}")
|
|
print("IMPORTING SALE CONTRACT LINE FEES")
|
|
print(f"{'='*70}\n")
|
|
|
|
# Get models
|
|
try:
|
|
SaleLineFee = Model.get('fee.fee')
|
|
except Exception as e:
|
|
print(f"✗ Error: Could not load fee.fee model - {e}")
|
|
print("Please ensure the model name is correct for your Tryton customization")
|
|
return
|
|
|
|
imported_count = 0
|
|
skipped_count = 0
|
|
error_count = 0
|
|
errors = []
|
|
|
|
try:
|
|
with open(csv_file, 'r', encoding='utf-8-sig') as file:
|
|
reader = csv.DictReader(file)
|
|
|
|
current_contract_number = None
|
|
current_contract = None
|
|
current_line_sequence = None
|
|
current_line = None
|
|
|
|
for row_num, row in enumerate(reader, start=2): # Start at 2 (header is row 1)
|
|
try:
|
|
# Extract data from CSV
|
|
contract_number = row.get('contract_number', '').strip()
|
|
line_sequence = row.get('line_sequence', '').strip()
|
|
product_code = row.get('product', '').strip()
|
|
supplier_name = row.get('supplier', '').strip()
|
|
currency_code = row.get('currency', '').strip()
|
|
p_r_value = row.get('p_r', '').strip()
|
|
mode_name = row.get('mode', '').strip()
|
|
price_value = row.get('price', '').strip()
|
|
unit_value = row.get('unit', '').strip()
|
|
|
|
print(f"Processing row {row_num}: {contract_number} - Line {line_sequence} - {product_code}")
|
|
|
|
# Validate required fields
|
|
if not contract_number:
|
|
print(f" ✗ Skipping: Missing contract_number\n")
|
|
skipped_count += 1
|
|
continue
|
|
|
|
if not line_sequence:
|
|
print(f" ✗ Skipping: Missing line_sequence\n")
|
|
skipped_count += 1
|
|
continue
|
|
|
|
if not product_code:
|
|
print(f" ✗ Skipping: Missing product\n")
|
|
skipped_count += 1
|
|
continue
|
|
|
|
# Cache contract and line if same as previous row
|
|
if contract_number != current_contract_number:
|
|
current_contract = find_sale_contract_by_number(contract_number)
|
|
current_contract_number = contract_number
|
|
current_line_sequence = None
|
|
current_line = None
|
|
|
|
if not current_contract:
|
|
print(f" ✗ Skipping: Contract not found\n")
|
|
skipped_count += 1
|
|
continue
|
|
|
|
# Cache line if same as previous row
|
|
if line_sequence != current_line_sequence:
|
|
current_line = find_contract_line_by_sequence(current_contract, line_sequence)
|
|
current_line_sequence = line_sequence
|
|
|
|
if not current_line:
|
|
print(f" ✗ Skipping: Contract line not found\n")
|
|
skipped_count += 1
|
|
continue
|
|
|
|
# Find related records
|
|
product = find_product_by_code(product_code)
|
|
if not product:
|
|
print(f" ✗ Skipping: Product not found\n")
|
|
skipped_count += 1
|
|
continue
|
|
|
|
supplier = find_party_by_name(supplier_name)
|
|
if not supplier:
|
|
print(f" ✗ Skipping: Supplier not found\n")
|
|
skipped_count += 1
|
|
continue
|
|
|
|
# Ensure party has SUPPLIER category
|
|
supplier, is_supplier = ensure_party_is_supplier(supplier, auto_enable=AUTO_ENABLE_SUPPLIER)
|
|
|
|
if not is_supplier:
|
|
if SKIP_NON_SUPPLIERS:
|
|
print(f" ⚠ Skipping sale - party does not have SUPPLIER category\n")
|
|
skipped_count += 1
|
|
current_sale = None
|
|
continue
|
|
else:
|
|
error_msg = f"Row {row_num}: Party '{supplier.rec_name}' does not have SUPPLIER category"
|
|
errors.append(error_msg)
|
|
error_count += 1
|
|
current_sale = None
|
|
continue
|
|
|
|
currency = find_currency_by_code(currency_code)
|
|
if not currency:
|
|
print(f" ✗ Skipping: Currency not found\n")
|
|
skipped_count += 1
|
|
continue
|
|
|
|
# Parse price
|
|
price = parse_decimal(price_value, 'price')
|
|
if price is None:
|
|
print(f" ✗ Skipping: Invalid price\n")
|
|
skipped_count += 1
|
|
continue
|
|
|
|
# Determine payable/receivable
|
|
payable_receivable = find_payable_receivable_by_name(p_r_value)
|
|
|
|
# Find fee mode
|
|
mode = find_fee_mode_by_name(mode_name)
|
|
|
|
# Check if fee already exists
|
|
existing_fees = get_existing_fees_for_line(current_line)
|
|
if fee_already_exists(existing_fees, product, supplier, price):
|
|
print(f" ○ Fee already exists for this line\n")
|
|
skipped_count += 1
|
|
continue
|
|
|
|
# Create the fee
|
|
fee = SaleLineFee()
|
|
fee.sale_line = current_line
|
|
fee.product = product
|
|
fee.supplier = supplier
|
|
fee.currency = currency
|
|
fee.price = price
|
|
|
|
# Set type if found and field exists
|
|
if mode and hasattr(fee, 'type'):
|
|
fee.type = 'budgeted' # Assuming all imported fees are 'budgeted'
|
|
|
|
# Set weight_type if found and field exists
|
|
if mode and hasattr(fee, 'weight_type'):
|
|
fee.weight_type = 'brut'
|
|
|
|
# Set p_r (payable or receivable) if found and field exists
|
|
if mode and hasattr(fee, 'p_r'):
|
|
fee.p_r = payable_receivable
|
|
|
|
# Set mode if found and field exists
|
|
if mode and hasattr(fee, 'mode'):
|
|
fee.mode = mode
|
|
|
|
# Set unit if field exists
|
|
if unit_value and hasattr(fee, 'unit'):
|
|
# Try to find the unit
|
|
Unit = Model.get('product.uom')
|
|
units = Unit.find([('symbol', '=', unit_value)])
|
|
if not units:
|
|
units = Unit.find([('name', '=', unit_value)])
|
|
if units:
|
|
fee.unit = units[0]
|
|
|
|
# Save the fee
|
|
fee.save()
|
|
|
|
print(f" ✓ Fee created successfully")
|
|
print(f" Fee ID: {fee.id}")
|
|
print(f" Product: {product.rec_name}")
|
|
print(f" Supplier: {supplier.rec_name}")
|
|
print(f" Price: {price} {currency.code}")
|
|
print(f" Type: {payable_receivable}")
|
|
print()
|
|
|
|
imported_count += 1
|
|
|
|
except Exception as e:
|
|
error_msg = f"Row {row_num} - {contract_number}: {str(e)}"
|
|
errors.append(error_msg)
|
|
error_count += 1
|
|
print(f"✗ Error on row {row_num}: {e}\n")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
# Summary
|
|
print(f"{'='*70}")
|
|
print("IMPORT SUMMARY")
|
|
print(f"{'='*70}")
|
|
print(f"Successfully imported: {imported_count} fees")
|
|
print(f"Skipped (missing data or already exist): {skipped_count} fees")
|
|
print(f"Errors: {error_count}")
|
|
|
|
if errors:
|
|
print(f"\nError details:")
|
|
for error in errors:
|
|
print(f" - {error}")
|
|
|
|
print(f"\n{'='*70}")
|
|
|
|
except FileNotFoundError:
|
|
print(f"✗ Error: CSV file not found at {csv_file}")
|
|
print(f"Please update CSV_FILE_PATH in the script with the correct path.")
|
|
except Exception as e:
|
|
print(f"✗ Fatal error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
|
|
def verify_import():
|
|
"""Verify imported sale contract fees"""
|
|
|
|
print(f"\n{'='*70}")
|
|
print("VERIFICATION - Sale Contract Line Fees")
|
|
print(f"{'='*70}\n")
|
|
|
|
try:
|
|
SaleLineFee = Model.get('fee.fee')
|
|
|
|
# Find all fees (or limit to recently created ones)
|
|
fees = SaleLineFee.find([], order=[('id', 'DESC')])
|
|
|
|
if fees:
|
|
print(f"Found {len(fees)} fees (showing last 10):\n")
|
|
print(f"{'ID':<8} {'Contract':<15} {'Product':<25} {'Supplier':<25} {'Price':<12} {'Type':<12}")
|
|
print("-" * 105)
|
|
|
|
for fee in fees[:10]: # Show last 10 created
|
|
fee_id = fee.id
|
|
|
|
# Get contract number
|
|
contract_number = 'N/A'
|
|
if hasattr(fee, 'line') and fee.line:
|
|
line = fee.line
|
|
if hasattr(line, 'sale') and line.sale:
|
|
contract = line.sale
|
|
if hasattr(contract, 'number') and contract.number:
|
|
contract_number = str(contract.number)[:14]
|
|
|
|
product = fee.product.rec_name[:24] if hasattr(fee, 'product') and fee.product else 'N/A'
|
|
supplier = fee.supplier.rec_name[:24] if hasattr(fee, 'supplier') and fee.supplier else 'N/A'
|
|
price = f"{fee.price:.2f}" if hasattr(fee, 'price') and fee.price else 'N/A'
|
|
|
|
# Get type (payable/receivable)
|
|
fee_type = 'N/A'
|
|
if hasattr(fee, 'type'):
|
|
fee_type = fee.type
|
|
elif hasattr(fee, 'payable_receivable'):
|
|
fee_type = fee.payable_receivable
|
|
|
|
print(f"{fee_id:<8} {contract_number:<15} {product:<25} {supplier:<25} {price:<12} {fee_type:<12}")
|
|
else:
|
|
print("No fees found")
|
|
|
|
print()
|
|
|
|
except Exception as e:
|
|
print(f"✗ Error during verification: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
|
|
def list_sale_contracts():
|
|
"""List sale contracts for debugging"""
|
|
Sale = Model.get('sale.sale')
|
|
|
|
print(f"\n{'='*70}")
|
|
print("AVAILABLE SALE CONTRACTS (first 20)")
|
|
print(f"{'='*70}")
|
|
|
|
contracts = Sale.find([], order=[('id', 'DESC')], limit=20)
|
|
|
|
if contracts:
|
|
print(f"{'ID':<8} {'Number':<20} {'Party':<30} {'State':<12}")
|
|
print("-" * 70)
|
|
|
|
for contract in contracts:
|
|
contract_id = contract.id
|
|
number = contract.number[:19] if contract.number else 'N/A'
|
|
party = contract.party.rec_name[:29] if contract.party else 'N/A'
|
|
state = contract.state if contract.state else 'N/A'
|
|
|
|
print(f"{contract_id:<8} {number:<20} {party:<30} {state:<12}")
|
|
|
|
# Show number of lines
|
|
if hasattr(contract, 'lines') and contract.lines:
|
|
print(f" Lines: {len(contract.lines)}")
|
|
else:
|
|
print("No sale contracts found")
|
|
|
|
print(f"{'='*70}\n")
|
|
|
|
|
|
def main():
|
|
print("="*70)
|
|
print("TRYTON SALE CONTRACT FEE IMPORT SCRIPT")
|
|
print("Using Proteus with XML-RPC Connection")
|
|
print("="*70)
|
|
print()
|
|
|
|
# Connect to Tryton using XML-RPC
|
|
if not connect_to_tryton():
|
|
return 1
|
|
|
|
# Optional: List sale contracts for debugging
|
|
# Uncomment the following line to see available contracts
|
|
# list_sale_contracts()
|
|
|
|
# Import sale contract fees
|
|
import_sale_contract_fees(CSV_FILE_PATH)
|
|
|
|
# Verify import
|
|
verify_import()
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
exit(main())
|