Files
AzureAD\SylvainDUVERNAY 4bac53b01c Add scripts for importing prices and sale contract fees; update purchase fees script
- Implemented `import_prices.py` to import price index values from a CSV file with migration mapping.
- Created `import_sale_fees.py` for importing sale contract line fees, including detailed logging and error handling.
- Modified `import_purchase_fees.py` to change fee type from 'ordered' to 'budgeted' and added fee ID logging.
2026-03-24 14:13:23 +01:00

301 lines
12 KiB
Python

# Add parent directory to Python path so we can import helpers
import sys
from pathlib import Path
parent_dir = Path(__file__).parent.parent
sys.path.insert(0, str(parent_dir))
import csv
from decimal import Decimal
from proteus import Model
from helpers.config import (
PRICES_CSV,
connect_to_tryton,
DB_CONFIG,
)
from helpers.tryton_helpers import (
parse_decimal,
parse_date,
)
from helpers.migration_mapping import MigrationMapper
# ---------------------------------------------------------------------------
# CSV Configuration
# ---------------------------------------------------------------------------
CSV_FILE_PATH = PRICES_CSV
# Set to True to skip records that already exist (same price_index + price_date)
SKIP_DUPLICATES = True
# ---------------------------------------------------------------------------
# Main import function
# ---------------------------------------------------------------------------
def import_prices(csv_file):
"""Import price index values from CSV file with migration mapping tracking"""
PriceIndex = Model.get('price.price') # table: price_price
PriceRecord = Model.get('price.price_value') # table: price_price_value
print(f"{'='*70}")
print(f"IMPORTING PRICES FROM CSV")
print(f"{'='*70}\n")
print(f"Reading from: {csv_file}\n")
imported_count = 0
skipped_count = 0
error_count = 0
errors = []
# Build lookup: name -> PriceIndex instance (avoid repeated DB calls)
print("Loading price indexes...")
all_indexes = PriceIndex.find([])
index_map = {pi.price_index: pi for pi in all_indexes}
print(f" Found {len(index_map)} price indexes in database\n")
# Collect mappings for batch insert at the end
price_mappings = []
try:
with MigrationMapper(DB_CONFIG) as mapper:
with open(csv_file, 'r', encoding='utf-8-sig') as file:
reader = csv.DictReader(file)
row_num = 0
for row in reader:
row_num += 1
# Use source_id from CSV if present (optional column)
source_id = row.get('source_id', '').strip()
index_name = row.get('price_index', '').strip()
try:
print(f"{'='*70}")
print(f"Row {row_num}: {index_name} / {row.get('price_date', '').strip()}")
print(f"{'='*70}")
# -- Skip check via migration mapper ----------------
if source_id:
existing_tryton_id = mapper.get_tryton_id('price_value', source_id)
if existing_tryton_id:
print(f" ⏭ Already imported (Source ID: {source_id} -> Tryton ID: {existing_tryton_id})\n")
skipped_count += 1
continue
# -- Lookup price index -----------------------------
if not index_name:
raise ValueError("price_index is empty")
if index_name not in index_map:
raise ValueError(f"price_index '{index_name}' not found in database")
price_index = index_map[index_name]
# -- Parse fields -----------------------------------
price_date = parse_date(row.get('price_date', ''))
high_price = parse_decimal(row.get('high_price', ''), 'high_price')
low_price = parse_decimal(row.get('low_price', ''), 'low_price')
open_price = parse_decimal(row.get('open_price', ''), 'open_price')
price_value = parse_decimal(row.get('price_value', ''), 'price_value')
if not price_date:
raise ValueError(f"Invalid or empty price_date: '{row.get('price_date')}'")
# -- Duplicate check (index + date) -----------------
if SKIP_DUPLICATES:
existing = PriceRecord.find([
('price', '=', price_index.id),
('price_date', '=', price_date),
])
if existing:
print(f" ⏭ Duplicate skipped ({index_name} / {price_date})\n")
skipped_count += 1
continue
# -- Create record ----------------------------------
print(f" Creating price record...")
record = PriceRecord()
record.price = price_index
record.price_date = price_date
record.high_price = high_price
record.low_price = low_price
record.open_price = open_price
record.price_value = price_value
record.active = True
record.save()
print(f" ✓ Created (ID: {record.id})")
print(f" Index : {index_name}")
print(f" Date : {price_date}")
print(f" High : {high_price} Low: {low_price} Open: {open_price} Value: {price_value}")
# -- Save migration mapping -------------------------
if source_id:
price_mappings.append({
'object_type': 'price_value',
'source_id': source_id,
'tryton_model': 'price.price_value',
'tryton_id': record.id,
'recon_key': f"{index_name}-{price_date}",
})
imported_count += 1
print()
except Exception as e:
error_msg = f"Row {row_num} - {index_name or 'Unknown'}: {str(e)}"
errors.append(error_msg)
error_count += 1
print(f" ✗ Error on row {row_num}: {e}\n")
import traceback
traceback.print_exc()
# -- Batch save all migration mappings -------------------------
if price_mappings:
print(f"\n{'='*70}")
print("SAVING MIGRATION MAPPINGS")
print(f"{'='*70}\n")
print(f"Saving {len(price_mappings)} price mappings...")
mapper.save_mappings_batch(price_mappings)
print(f"✓ Price mappings saved\n")
# -- Summary -------------------------------------------------------
print(f"{'='*70}")
print("IMPORT SUMMARY")
print(f"{'='*70}")
print(f"Successfully imported : {imported_count}")
print(f"Skipped (duplicates) : {skipped_count}")
print(f"Errors : {error_count}")
if price_mappings:
print(f"Migration mappings : {len(price_mappings)}")
if errors:
print(f"\nError details:")
for error in errors:
print(f" - {error}")
print(f"\n{'='*70}")
except FileNotFoundError:
print(f"✗ Error: CSV file not found at {csv_file}")
print(f"Please update PRICES_CSV in helpers/config.py with the correct path.")
except Exception as e:
print(f"✗ Fatal error: {e}")
import traceback
traceback.print_exc()
# ---------------------------------------------------------------------------
# Verify import
# ---------------------------------------------------------------------------
def verify_import():
"""Display the last 10 imported price records with their migration mappings"""
PriceRecord = Model.get('price.price_value')
print(f"\n{'='*70}")
print("VERIFICATION - Price Records")
print(f"{'='*70}\n")
records = PriceRecord.find([], order=[('id', 'DESC')])
if records:
print(f"Found {len(records)} price records (showing last 10):\n")
print(f"{'ID':<8} {'Index':<30} {'Date':<12} {'High':>10} {'Low':>10} {'Open':>10} {'Value':>10}")
print("-" * 95)
with MigrationMapper(DB_CONFIG) as mapper:
for rec in records[:10]:
# Look up source ID from migration mapping
source_id = 'N/A'
try:
cursor = mapper.connection.cursor()
cursor.execute("""
SELECT source_id[1]
FROM public.os_migration_mapping
WHERE tryton_id = %s
AND 'price_value' = ANY(object_type)
ORDER BY write_date DESC
LIMIT 1
""", (rec.id,))
result = cursor.fetchone()
if result and result[0]:
source_id = str(result[0])
cursor.close()
except Exception:
pass
index_name = rec.price_index.name if rec.price_index else 'N/A'
print(
f"{rec.id:<8} {index_name:<30} {str(rec.price_date):<12}"
f" {str(rec.high_price):>10} {str(rec.low_price):>10}"
f" {str(rec.open_price):>10} {str(rec.price_value):>10}"
)
else:
print("No price records found")
print()
# ---------------------------------------------------------------------------
# Check mapping stats
# ---------------------------------------------------------------------------
def check_mapping_stats():
"""Display statistics about migration mappings for price_value"""
print(f"\n{'='*70}")
print("MIGRATION MAPPING STATISTICS — price_value")
print(f"{'='*70}\n")
try:
with MigrationMapper(DB_CONFIG) as mapper:
cursor = mapper.connection.cursor()
cursor.execute("""
SELECT
COUNT(*) AS count,
MIN(write_date) AS first_import,
MAX(write_date) AS last_import
FROM public.os_migration_mapping
WHERE 'price_value' = ANY(object_type)
""")
result = cursor.fetchone()
if result and result[0]:
print(f"Total price_value mappings : {result[0]}")
print(f"First import : {result[1].strftime('%Y-%m-%d %H:%M') if result[1] else 'N/A'}")
print(f"Last import : {result[2].strftime('%Y-%m-%d %H:%M') if result[2] else 'N/A'}")
else:
print("No price_value mappings found yet")
cursor.close()
except Exception as e:
print(f"Error retrieving mapping statistics: {e}")
import traceback
traceback.print_exc()
print()
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main():
print("=" * 70)
print("TRYTON PRICE IMPORT SCRIPT WITH MIGRATION MAPPING")
print("Using Proteus with XML-RPC Connection")
print("=" * 70)
print()
if not connect_to_tryton():
return 1
import_prices(CSV_FILE_PATH)
verify_import()
check_mapping_stats()
return 0
if __name__ == '__main__':
exit(main())