Initial commit

This commit is contained in:
AzureAD\SylvainDUVERNAY
2026-02-13 14:23:19 +01:00
commit 078843f991
80 changed files with 23994 additions and 0 deletions

View File

@@ -0,0 +1,239 @@
from proteus import config, Model
import psycopg2
# XML-RPC Configuration (default connection method)
HTTPS = 'https://'
SERVER_URL = 'itsa.open-squared.tech'
DATABASE_NAME = 'tradon'
USERNAME = 'admin'
PASSWORD = 'dsproject'
# PostgreSQL Configuration (for direct database inspection)
DB_HOST = '72.61.163.139'
DB_PORT = 5433
DB_USER = 'postgres'
DB_PASSWORD = 'dsproject'
print("="*80)
print("CUSTOM FIELDS IDENTIFICATION FOR purchase.purchase")
print("="*80)
# Connect to Tryton via XML-RPC
print(f"\nConnecting via XML-RPC to {SERVER_URL}...")
config.set_xmlrpc(f'{HTTPS}{USERNAME}:{PASSWORD}@{SERVER_URL}/{DATABASE_NAME}/')
print("✓ Connected successfully\n")
Purchase = Model.get('purchase.purchase')
# Get all fields that Proteus sees
proteus_fields = sorted([key for key in dir(Purchase)
if not key.startswith('_')
and key not in ['create', 'delete', 'save', 'find',
'copy', 'read', 'write', 'search']])
print(f"1. FIELDS VISIBLE TO PROTEUS: {len(proteus_fields)} fields")
print("-"*80)
# Standard Tryton purchase.purchase fields (from base module)
standard_purchase_fields = {
'id', 'create_date', 'create_uid', 'write_date', 'write_uid',
'company', 'party', 'invoice_party', 'invoice_address',
'payment_term', 'warehouse', 'currency', 'description',
'comment', 'state', 'purchase_date', 'invoice_method',
'lines', 'invoices', 'invoices_ignored', 'invoices_recreated',
'invoice_lines', 'invoice_lines_ignored', 'moves',
'shipment_state', 'invoice_state', 'number', 'reference',
'shipments', 'shipment_returns', 'rec_name', 'origin',
'untaxed_amount', 'tax_amount', 'total_amount',
'untaxed_amount_cache', 'tax_amount_cache', 'total_amount_cache',
'delivery_date', 'party_lang', 'contact', 'xml_id'
}
# Identify potential custom fields
potential_custom_fields = [f for f in proteus_fields if f not in standard_purchase_fields]
print(f"\n2. POTENTIAL CUSTOM FIELDS: {len(potential_custom_fields)} fields")
print("-"*80)
for field in potential_custom_fields:
print(f" - {field}")
# Connect to PostgreSQL to get actual table columns
print(f"\n3. COLUMNS IN POSTGRESQL TABLE 'purchase_purchase'")
print("-"*80)
try:
conn = psycopg2.connect(
dbname=DATABASE_NAME,
host=DB_HOST,
port=DB_PORT,
user=DB_USER,
password=DB_PASSWORD
)
cursor = conn.cursor()
# Get all columns from purchase_purchase table
cursor.execute("""
SELECT
column_name,
data_type,
character_maximum_length,
is_nullable,
column_default
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'purchase_purchase'
ORDER BY ordinal_position;
""")
db_columns = cursor.fetchall()
print(f"Total columns in database: {len(db_columns)}\n")
# Standard columns that typically exist in purchase_purchase
standard_db_columns = {
'id', 'create_date', 'create_uid', 'write_date', 'write_uid',
'company', 'party', 'invoice_party', 'invoice_address',
'payment_term', 'warehouse', 'currency', 'description',
'comment', 'state', 'purchase_date', 'invoice_method',
'number', 'reference', 'delivery_date', 'contact',
'shipment_state', 'invoice_state', 'origin',
'untaxed_amount_cache', 'tax_amount_cache', 'total_amount_cache'
}
db_column_names = [col[0] for col in db_columns]
custom_db_columns = [col for col in db_columns if col[0] not in standard_db_columns]
print("Custom columns in database:")
for col in custom_db_columns:
col_name, data_type, max_length, nullable, default = col
length_info = f"({max_length})" if max_length else ""
print(f" - {col_name:<30} {data_type}{length_info:<15} NULL: {nullable}")
# Compare: Fields in Proteus vs Columns in DB
print(f"\n4. COMPARISON: PROTEUS vs DATABASE")
print("-"*80)
# Fields in Proteus but NOT as direct columns in DB (might be related fields, functions, etc.)
proteus_only = set(potential_custom_fields) - set(db_column_names)
if proteus_only:
print(f"\nFields in Proteus but NOT as columns in DB ({len(proteus_only)}):")
print("(These might be Many2One, One2Many, Function fields, etc.)")
for field in sorted(proteus_only):
print(f" - {field}")
# Columns in DB but NOT visible in Proteus (these are the problem!)
db_only = set([col[0] for col in custom_db_columns]) - set(proteus_fields)
if db_only:
print(f"\n⚠️ COLUMNS IN DATABASE BUT NOT VISIBLE IN PROTEUS ({len(db_only)}):")
print("(These fields MUST be added to the Python model!)")
for field in sorted(db_only):
print(f" - {field}")
# Fields that exist in BOTH Proteus and DB
both = set(potential_custom_fields) & set([col[0] for col in custom_db_columns])
if both:
print(f"\n✓ Custom fields properly defined in BOTH Proteus and DB ({len(both)}):")
for field in sorted(both):
print(f" - {field}")
cursor.close()
conn.close()
except Exception as e:
print(f"Error connecting to PostgreSQL: {e}")
# Test persistence of custom fields
print(f"\n5. TESTING FIELD PERSISTENCE")
print("-"*80)
try:
# Find a draft purchase to test
drafts = Purchase.find([('state', '=', 'draft')], limit=1)
if drafts:
test_purchase = drafts[0]
test_id = test_purchase.id
print(f"Testing with purchase ID: {test_id}")
print("\nTesting custom fields (attempting to set and save):\n")
# Test a sample of custom fields
test_fields = {}
# Add fields to test if they exist
if 'reference' in potential_custom_fields:
test_fields['reference'] = 'TEST_REF'
if 'crop' in potential_custom_fields:
test_fields['crop'] = 'TEST_CROP'
if 'forex' in potential_custom_fields:
test_fields['forex'] = 'TEST_FOREX'
if 'broker' in potential_custom_fields:
test_fields['broker'] = 'TEST_BROKER'
if 'certif' in potential_custom_fields:
test_fields['certif'] = 'TEST_CERT'
if 'wb' in potential_custom_fields:
test_fields['wb'] = 'TEST_WB'
for field_name, test_value in test_fields.items():
try:
original_value = getattr(test_purchase, field_name, None)
setattr(test_purchase, field_name, test_value)
test_purchase.save()
# Reload
reloaded = Purchase(test_id)
new_value = getattr(reloaded, field_name, None)
if new_value == test_value:
print(f"{field_name}: PERSISTS correctly")
# Restore original value
setattr(reloaded, field_name, original_value)
reloaded.save()
else:
print(f"{field_name}: Does NOT persist (expected: '{test_value}', got: '{new_value}')")
except Exception as e:
print(f"{field_name}: Error - {str(e)[:60]}")
else:
print("No draft purchases found for testing")
except Exception as e:
print(f"Error during persistence testing: {e}")
print("\n" + "="*80)
print("SUMMARY & RECOMMENDATIONS")
print("="*80)
print("""
Next steps for your colleague:
1. Review the "⚠️ COLUMNS IN DATABASE BUT NOT VISIBLE IN PROTEUS" section
→ These fields exist in PostgreSQL but are missing from the Python model
2. Review fields that "Does NOT persist" in the testing section
→ These fields are visible but not working correctly
3. Add missing fields to your custom Tryton module:
File: modules/your_custom_module/purchase.py
from trytond.pool import PoolMeta
from trytond.model import fields
class Purchase(metaclass=PoolMeta):
__name__ = 'purchase.purchase'
# Add each missing field with appropriate type:
custom_field = fields.Char('Custom Field')
custom_number = fields.Integer('Custom Number')
custom_date = fields.Date('Custom Date')
custom_many2one = fields.Many2One('other.model', 'Reference')
# etc...
4. Increment module version in tryton.cfg
5. Update module: trytond-admin -d tradon -u your_custom_module
6. Restart Tryton server
7. Re-run this script to verify all fields work correctly
""")

View File

@@ -0,0 +1,46 @@
from proteus import config, Model
# XML-RPC Configuration
HTTPS = 'https://'
SERVER_URL = 'itsa.open-squared.tech'
DATABASE_NAME = 'tradon'
USERNAME = 'admin'
PASSWORD = 'dsproject'
# Connect via XML-RPC
try:
config.set_xmlrpc(f'{HTTPS}{USERNAME}:{PASSWORD}@{SERVER_URL}/{DATABASE_NAME}/')
print(f"Connected to Tryton database '{DATABASE_NAME}' successfully!")
# Get the model using Model.get()
Purchase = Model.get('purchase.purchase')
try:
# Try to get any existing record or create new (without saving)
purchases = Purchase.find([], limit=1)
if purchases:
sample = purchases[0]
else:
sample = Purchase()
# Get field names from the instance
field_names = sorted([key for key in dir(sample)
if not key.startswith('_')
and key not in ['create', 'delete', 'save', 'find']])
print(f"\nTotal fields in purchase.purchase: {len(field_names)}")
print("\nField list:")
for field in field_names:
print(f"{field}")
except Exception as e:
print(f"Could not inspect fields via instance: {e}")
except Exception as e:
print(f"Connection or operation failed: {e}")
print("\nPlease verify:")
print(f" - Tryton server is running on {SERVER_URL}")
print(f" - Database '{DATABASE_NAME}' exists")
print(f" - Username and password are correct")

View File

@@ -0,0 +1,35 @@
from proteus import config, Model
# Connect
config.set_xmlrpc(f'https://admin:dsproject@itsa.open-squared.tech/tradon/')
Purchase = Model.get('purchase.purchase')
# Test: Set number on draft purchase
print("=== Testing Number Field Persistence ===")
draft = Purchase(682) # The ID from your previous test
print(f"Before: number = {draft.number}, state = {draft.state}")
# Set number
draft.number = "MANUAL_TEST_001"
draft.save()
print(f"After save: number = {draft.number}")
# Reload by fetching again from database
draft_reloaded = Purchase(682)
print(f"After reload: number = {draft_reloaded.number}")
if draft_reloaded.number == "MANUAL_TEST_001":
print("✓ SUCCESS: Number WAS persisted via Proteus!")
else:
print(f"✗ FAILED: Number NOT persisted. Got: {draft_reloaded.number}")
print("\nThis means the 'number' field is likely:")
print(" 1. Read-only (controlled by Tryton workflow)")
print(" 2. Auto-generated by a sequence")
print(" 3. Overwritten by server-side logic")
# Now verify in PostgreSQL
print("\n=== Verify in PostgreSQL ===")
print("Run this SQL query to confirm:")
print("SELECT id, number, state FROM purchase_purchase WHERE id = 682;")

View File

@@ -0,0 +1,44 @@
from proteus import config, Model
from decimal import getcontext, Decimal, ROUND_HALF_UP
# XML-RPC Configuration
HTTPS = 'https://'
SERVER_URL = 'itsa.open-squared.tech'
DATABASE_NAME = 'tradon'
USERNAME = 'admin'
PASSWORD = 'dsproject'
config = config.set_xmlrpc(f'{HTTPS}{USERNAME}:{PASSWORD}@{SERVER_URL}/{DATABASE_NAME}/')
Company = Model.get('company.company')
Party = Model.get('party.party')
Currency = Model.get('currency.currency')
Purchase = Model.get('purchase.purchase')
Product = Model.get('product.product')
Wb = Model.get('purchase.weight.basis')
# Récupération des records
company = Company(6)
party = Party(2776)
# Création de la commande d'achat
purchase = Purchase()
purchase.company = company
purchase.party = party
purchase.currency = company.currency
purchase.tol_min = Decimal(1)
purchase.wb = Wb(1)
# Ligne d'achat
product = Product(12) # id du produit
line = purchase.lines.new()
line.product = product
line.quantity = 10
line.unit_price = product.cost_price
# Sauvegarde
purchase.save()
print(f"Purchase créée : {purchase.id}")

View File

@@ -0,0 +1,45 @@
from proteus import config, Model
from decimal import getcontext, Decimal, ROUND_HALF_UP
# XML-RPC Configuration
HTTPS = 'https://'
SERVER_URL = 'itsa.open-squared.tech'
DATABASE_NAME = 'tradon'
USERNAME = 'admin'
PASSWORD = 'dsproject'
config = config.set_xmlrpc(f'{HTTPS}{USERNAME}:{PASSWORD}@{SERVER_URL}/{DATABASE_NAME}/')
Company = Model.get('company.company')
Party = Model.get('party.party')
Currency = Model.get('currency.currency')
sale = Model.get('sale.sale')
Product = Model.get('product.product')
Wb = Model.get('purchase.weight.basis')
Location = Model.get('stock.location')
# Récupération des records
company = Company(6)
party = Party(2789)
fromLocation = Location(1247)
# Création de la commande de vente
sale = sale()
sale.company = company
sale.party = party
sale.currency = company.currency
sale.tol_min = Decimal(1)
sale.wb = Wb(1)
sale.from_location = fromLocation
# Ligne d'achat
#product = Product(12) # id du produit
# line = sale.lines.new()
# line.product = product
# line.quantity = 10
# line.unit_price = product.cost_price
# Sauvegarde
sale.save()
print(f"sale créée : {sale.id}")

View File

@@ -0,0 +1,11 @@
import sys
from pathlib import Path
# Add parent directory to Python path
parent_dir = Path(__file__).parent.parent
sys.path.insert(0, str(parent_dir))
# Debug: Print what's available in config
import helpers.config as cfg
print("Available in config:", dir(cfg))
print("PURCHASE_FEES_CSV value:", getattr(cfg, 'PURCHASE_FEES_CSV', 'NOT FOUND'))

View File

@@ -0,0 +1,398 @@
import csv
from proteus import config, Model
# XML-RPC Configuration
HTTPS = 'https://'
SERVER_URL = 'itsa.open-squared.tech'
DATABASE_NAME = 'tradon'
USERNAME = 'admin'
PASSWORD = 'dsproject'
# CSV Configuration
CSV_FILE_PATH = r'C:\Users\SylvainDUVERNAY\Open Squared\Production - Documents\TRADON Implementation\ITSA\Reference Data\Loaders\Parties.csv'
# Default values
DEFAULT_COUNTRY = 'US' # Default country code if not specified
def connect_to_tryton():
"""Establish connection to Tryton via XML-RPC"""
print(f"Connecting to Tryton server: {SERVER_URL}")
print(f"Database: {DATABASE_NAME}")
print(f"Username: {USERNAME}")
try:
config.set_xmlrpc(f'{HTTPS}{USERNAME}:{PASSWORD}@{SERVER_URL}/{DATABASE_NAME}/')
print("✓ Connected successfully!\n")
return True
except Exception as e:
print(f"✗ Connection failed: {e}")
print("\nTroubleshooting:")
print(" - Verify the server URL is correct and accessible")
print(" - Check that the Tryton server is running")
print(" - Verify username and password are correct")
print(" - Make sure you can access the server in a browser")
return False
def get_country(country_code):
"""Find country by code"""
Country = Model.get('country.country')
if not country_code:
country_code = DEFAULT_COUNTRY
countries = Country.find([('code', '=', country_code.upper())])
if countries:
return countries[0]
else:
print(f" ⚠ Warning: Country '{country_code}' not found, using '{DEFAULT_COUNTRY}'")
default_countries = Country.find([('code', '=', DEFAULT_COUNTRY)])
if default_countries:
return default_countries[0]
# Get first available country as last resort
all_countries = Country.find([])
if all_countries:
print(f" ⚠ Using first available country: {all_countries[0].name}")
return all_countries[0]
raise ValueError("No countries found in database!")
def get_subdivision(country, subdivision_code):
"""Find country subdivision (state/province) by code"""
if not subdivision_code:
return None
Subdivision = Model.get('country.subdivision')
# Search for subdivision with matching code and country
subdivisions = Subdivision.find([
('code', '=', f"{country.code}-{subdivision_code}"),
('country', '=', country.id)
])
if subdivisions:
return subdivisions[0]
# Try without country prefix
subdivisions = Subdivision.find([
('code', 'ilike', f"%{subdivision_code}"),
('country', '=', country.id)
])
if subdivisions:
return subdivisions[0]
print(f" ⚠ Warning: Subdivision '{subdivision_code}' not found for country {country.code}")
return None
def check_party_exists_by_name(name):
"""Check if party with given name already exists"""
Party = Model.get('party.party')
parties = Party.find([('name', '=', name)])
return parties[0] if parties else None
def create_party_with_addresses(row):
"""Create a new party with address(es) using proteus"""
Party = Model.get('party.party')
Address = Model.get('party.address')
# Create party - let Tryton auto-generate the code
party = Party()
party.name = row['name']
if row.get('tax_identifier'):
party.tax_identifier = row['tax_identifier']
if row.get('vat_code'):
party.vat_code = row['vat_code']
# Save the party FIRST (without addresses)
party.save()
# Check if we have meaningful address data
# Require at least street OR city to be present (not empty)
has_street = bool(row.get('street'))
has_city = bool(row.get('city'))
has_postal_code = bool(row.get('postal_code'))
has_country = bool(row.get('country_code'))
# Create address only if we have at least street OR city
if has_street or has_city:
address = Address()
# Link to the party we just created
address.party = party
if row.get('address_name'):
address.name = row['address_name']
if has_street:
address.street = row['street']
if has_city:
address.city = row['city']
# Use postal_code instead of zip
if has_postal_code:
address.postal_code = row['postal_code']
# Get country
if has_country:
country_code = row['country_code']
country = get_country(country_code)
else:
country = get_country(DEFAULT_COUNTRY)
address.country = country
# Get subdivision (state/province) if provided
if row.get('subdivision_code'):
subdivision = get_subdivision(country, row['subdivision_code'])
if subdivision:
address.subdivision = subdivision
# Save the address separately
address.save()
# Clean up any empty addresses that might have been auto-created
# Reload party to get fresh data
party = Party(party.id)
# Find and delete empty addresses
addresses_to_delete = []
for addr in party.addresses:
# Consider an address empty if it has no street, city, or postal_code
is_empty = (
(not addr.street or not addr.street.strip()) and
(not addr.city or not addr.city.strip()) and
(not addr.postal_code or not addr.postal_code.strip())
)
if is_empty:
addresses_to_delete.append(addr)
# Delete empty addresses
if addresses_to_delete:
Address.delete(addresses_to_delete)
print(f" Cleaned up {len(addresses_to_delete)} empty address(es)")
# Reload party one more time to return clean data
party = Party(party.id)
return party
def import_parties(csv_file):
"""Import parties from CSV file"""
imported_count = 0
skipped_count = 0
error_count = 0
errors = []
# Track names we've already processed in this run
processed_names = set()
print(f"{'='*70}")
print(f"Importing parties from: {csv_file}")
print(f"{'='*70}\n")
try:
# Open with utf-8-sig to handle BOM
with open(csv_file, 'r', encoding='utf-8-sig') as file:
reader = csv.DictReader(file)
# Debug: Show detected columns
print(f"Detected columns: {reader.fieldnames}\n")
for row_num, row in enumerate(reader, start=2):
try:
# Clean up values
name = row.get('name', '').strip()
tax_identifier = row.get('tax_identifier', '').strip()
vat_code = row.get('vat_code', '').strip()
# Address fields
address_name = row.get('address_name', '').strip()
street = row.get('street', '').strip()
city = row.get('city', '').strip()
# Handle both 'zip' and 'postal_code' column names
postal_code = row.get('postal_code', '').strip() or row.get('zip', '').strip()
country_code = row.get('country_code', '').strip()
subdivision_code = row.get('subdivision_code', '').strip()
# Skip empty rows
if not name:
continue
# Skip if postal_code is 'NULL' or '0'
if postal_code and postal_code.upper() in ['NULL', '0']:
postal_code = ''
print(f"Processing Row {row_num}: {name}")
# Check if we've already processed this name in this import run
if name in processed_names:
print(f" ⚠ Duplicate name in CSV: '{name}'")
print(f" Skipping duplicate entry...\n")
skipped_count += 1
continue
# Check if party already exists in database
existing_party = check_party_exists_by_name(name)
if existing_party:
print(f" ⚠ Party '{name}' already exists with code: {existing_party.code}")
print(f" Skipping...\n")
skipped_count += 1
processed_names.add(name)
continue
# Create the party with address
row_data = {
'name': name,
'tax_identifier': tax_identifier,
'vat_code': vat_code,
'address_name': address_name,
'street': street,
'city': city,
'postal_code': postal_code,
'country_code': country_code,
'subdivision_code': subdivision_code
}
party = create_party_with_addresses(row_data)
# Mark this name as processed
processed_names.add(name)
print(f" ✓ Created party")
print(f" Party ID: {party.id}")
print(f" Auto-generated Code: {party.code}")
print(f" Name: {name}")
if tax_identifier:
print(f" Tax Identifier: {tax_identifier}")
if vat_code:
print(f" VAT Code: {vat_code}")
if party.addresses:
print(f" Addresses: {len(party.addresses)}")
for addr in party.addresses:
addr_street = (addr.street[:50] + '...') if addr.street and len(addr.street) > 50 else (addr.street or 'N/A')
addr_city = addr.city if addr.city else 'N/A'
addr_postal = addr.postal_code if addr.postal_code else 'N/A'
print(f" - {addr_street}")
print(f" {addr_city}, {addr_postal}")
else:
print(f" Addresses: 0 (no address data provided)")
print()
imported_count += 1
except Exception as e:
error_msg = f"Row {row_num} - {name}: {str(e)}"
errors.append(error_msg)
error_count += 1
print(f"✗ Error on row {row_num}: {e}\n")
import traceback
traceback.print_exc()
# Summary
print(f"{'='*70}")
print("IMPORT SUMMARY")
print(f"{'='*70}")
print(f"Successfully imported: {imported_count} parties")
print(f"Skipped (already exist or duplicates): {skipped_count} parties")
print(f"Errors: {error_count}")
if errors:
print(f"\nError details:")
for error in errors:
print(f" - {error}")
print(f"\n{'='*70}")
except FileNotFoundError:
print(f"✗ Error: CSV file not found at {csv_file}")
print(f"Please update CSV_FILE_PATH in the script with the correct path.")
except Exception as e:
print(f"✗ Fatal error: {e}")
import traceback
traceback.print_exc()
def verify_import():
"""Verify imported parties"""
Party = Model.get('party.party')
print(f"\n{'='*70}")
print("VERIFICATION - Parties")
print(f"{'='*70}\n")
# Find all parties (or limit to recently created ones)
parties = Party.find([], order=[('id', 'DESC')])
if parties:
print(f"Found {len(parties)} parties (showing last 20):\n")
print(f"{'Code':<15} {'Name':<40} {'Addresses':<10}")
print("-" * 70)
for party in parties[:20]: # Show last 20 created
code = party.code or 'N/A'
name = party.name[:39] if party.name else 'N/A'
addr_count = len(party.addresses) if party.addresses else 0
print(f"{code:<15} {name:<40} {addr_count:<10}")
else:
print("No parties found")
print()
def list_available_countries():
"""List all available countries"""
print(f"\n{'='*70}")
print("AVAILABLE COUNTRIES (first 20)")
print(f"{'='*70}\n")
Country = Model.get('country.country')
countries = Country.find([])
if countries:
print(f"Found {len(countries)} countries:\n")
for country in countries[:20]: # Show first 20
print(f" - {country.code}: {country.name}")
if len(countries) > 20:
print(f" ... and {len(countries) - 20} more")
else:
print("No countries found")
print()
def main():
print("="*70)
print("TRYTON PARTY IMPORT SCRIPT")
print("Using Proteus with XML-RPC Connection")
print("Party codes will be auto-generated by Tryton")
print("="*70)
print()
# Connect to Tryton using XML-RPC
if not connect_to_tryton():
return 1
# Optional: List available countries
# Uncomment if you want to see what's available in your database
# list_available_countries()
# Import parties
import_parties(CSV_FILE_PATH)
# Verify import
verify_import()
return 0
if __name__ == '__main__':
exit(main())

View File

@@ -0,0 +1,364 @@
import sys
from pathlib import Path
# Add parent directory to Python path so we can import helpers
parent_dir = Path(__file__).parent.parent
sys.path.insert(0, str(parent_dir))
import csv
from decimal import Decimal
from proteus import config, Model
from helpers.config import (
PURCHASE_FEES_CSV,
connect_to_tryton)
from helpers.tryton_helpers import (
find_party_by_name,
find_product_by_code,
find_purchase_contract_by_ref,
find_contract_line_by_sequence,
find_currency_by_code,
parse_decimal,
find_supplier_category,
ensure_party_is_supplier,
find_fee_mode_by_name,
find_payable_receivable_by_name,
get_existing_fees_for_line,
fee_already_exists)
# CSV Configuration
CSV_FILE_PATH = PURCHASE_FEES_CSV
# Import options
AUTO_ENABLE_SUPPLIER = True # Set to False to skip auto-enabling supplier flag
SKIP_NON_SUPPLIERS = False # Set to True to skip parties that aren't suppliers
def import_purchase_contract_fees(csv_file):
"""Import purchase contract line fees from CSV"""
print(f"{'='*70}")
print("IMPORTING PURCHASE CONTRACT LINE FEES")
print(f"{'='*70}\n")
# Get models
try:
PurchaseLineFee = Model.get('fee.fee')
except Exception as e:
print(f"✗ Error: Could not load fee.fee model - {e}")
print("Please ensure the model name is correct for your Tryton customization")
return
imported_count = 0
skipped_count = 0
error_count = 0
errors = []
try:
with open(csv_file, 'r', encoding='utf-8-sig') as file:
reader = csv.DictReader(file)
current_contract_ref = None
current_contract = None
current_line_sequence = None
current_line = None
for row_num, row in enumerate(reader, start=2): # Start at 2 (header is row 1)
try:
# Extract data from CSV
contract_ref = row.get('contract_ref', '').strip()
line_sequence = row.get('line_sequence', '').strip()
product_code = row.get('product', '').strip()
supplier_name = row.get('supplier', '').strip()
currency_code = row.get('currency', '').strip()
p_r_value = row.get('p_r', '').strip()
mode_name = row.get('mode', '').strip()
price_value = row.get('price', '').strip()
unit_value = row.get('unit', '').strip()
print(f"Processing row {row_num}: {contract_ref} - Line {line_sequence} - {product_code}")
# Validate required fields
if not contract_ref:
print(f" ✗ Skipping: Missing contract_ref\n")
skipped_count += 1
continue
if not line_sequence:
print(f" ✗ Skipping: Missing line_sequence\n")
skipped_count += 1
continue
if not product_code:
print(f" ✗ Skipping: Missing product\n")
skipped_count += 1
continue
# Cache contract and line if same as previous row
if contract_ref != current_contract_ref:
current_contract = find_purchase_contract_by_ref(contract_ref)
current_contract_ref = contract_ref
current_line_sequence = None
current_line = None
if not current_contract:
print(f" ✗ Skipping: Contract not found\n")
skipped_count += 1
continue
# Cache line if same as previous row
if line_sequence != current_line_sequence:
current_line = find_contract_line_by_sequence(current_contract, line_sequence)
current_line_sequence = line_sequence
if not current_line:
print(f" ✗ Skipping: Contract line not found\n")
skipped_count += 1
continue
# Find related records
product = find_product_by_code(product_code)
if not product:
print(f" ✗ Skipping: Product not found\n")
skipped_count += 1
continue
supplier = find_party_by_name(supplier_name)
if not supplier:
print(f" ✗ Skipping: Supplier not found\n")
skipped_count += 1
continue
# Ensure party has SUPPLIER category
supplier, is_supplier = ensure_party_is_supplier(supplier, auto_enable=AUTO_ENABLE_SUPPLIER)
if not is_supplier:
if SKIP_NON_SUPPLIERS:
print(f" ⚠ Skipping purchase - party does not have SUPPLIER category\n")
skipped_count += 1
current_purchase = None
continue
else:
error_msg = f"Row {row_num}: Party '{supplier.rec_name}' does not have SUPPLIER category"
errors.append(error_msg)
error_count += 1
current_purchase = None
continue
currency = find_currency_by_code(currency_code)
if not currency:
print(f" ✗ Skipping: Currency not found\n")
skipped_count += 1
continue
# Parse price
price = parse_decimal(price_value, 'price')
if price is None:
print(f" ✗ Skipping: Invalid price\n")
skipped_count += 1
continue
# Determine payable/receivable
payable_receivable = find_payable_receivable_by_name(p_r_value)
# Find fee mode
mode = find_fee_mode_by_name(mode_name)
# Check if fee already exists
existing_fees = get_existing_fees_for_line(current_line)
if fee_already_exists(existing_fees, product, supplier, price):
print(f" ○ Fee already exists for this line\n")
skipped_count += 1
continue
# Create the fee
fee = PurchaseLineFee()
fee.line = current_line
fee.product = product
fee.supplier = supplier
fee.currency = currency
fee.price = price
# Set type if found and field exists
if mode and hasattr(fee, 'type'):
fee.type = 'ordered' # Assuming all imported fees are 'ordered'
# Set weight_type if found and field exists
if mode and hasattr(fee, 'weight_type'):
fee.weight_type = 'brut'
# Set p_r (payable or receivable) if found and field exists
if mode and hasattr(fee, 'p_r'):
fee.p_r = payable_receivable
# Set mode if found and field exists
if mode and hasattr(fee, 'mode'):
fee.mode = mode
# Set unit if field exists
if unit_value and hasattr(fee, 'unit'):
# Try to find the unit
Unit = Model.get('product.uom')
units = Unit.find([('symbol', '=', unit_value)])
if not units:
units = Unit.find([('name', '=', unit_value)])
if units:
fee.unit = units[0]
# Save the fee
fee.save()
print(f" ✓ Fee created successfully")
print(f" Product: {product.rec_name}")
print(f" Supplier: {supplier.rec_name}")
print(f" Price: {price} {currency.code}")
print(f" Type: {payable_receivable}")
print()
imported_count += 1
except Exception as e:
error_msg = f"Row {row_num} - {contract_ref}: {str(e)}"
errors.append(error_msg)
error_count += 1
print(f"✗ Error on row {row_num}: {e}\n")
import traceback
traceback.print_exc()
# Summary
print(f"{'='*70}")
print("IMPORT SUMMARY")
print(f"{'='*70}")
print(f"Successfully imported: {imported_count} fees")
print(f"Skipped (missing data or already exist): {skipped_count} fees")
print(f"Errors: {error_count}")
if errors:
print(f"\nError details:")
for error in errors:
print(f" - {error}")
print(f"\n{'='*70}")
except FileNotFoundError:
print(f"✗ Error: CSV file not found at {csv_file}")
print(f"Please update CSV_FILE_PATH in the script with the correct path.")
except Exception as e:
print(f"✗ Fatal error: {e}")
import traceback
traceback.print_exc()
def verify_import():
"""Verify imported purchase contract fees"""
print(f"\n{'='*70}")
print("VERIFICATION - Purchase Contract Line Fees")
print(f"{'='*70}\n")
try:
PurchaseLineFee = Model.get('fee.fee')
# Find all fees (or limit to recently created ones)
fees = PurchaseLineFee.find([], order=[('id', 'DESC')])
if fees:
print(f"Found {len(fees)} fees (showing last 50):\n")
print(f"{'ID':<8} {'Contract':<15} {'Product':<25} {'Supplier':<25} {'Price':<12} {'Type':<12}")
print("-" * 105)
for fee in fees[:50]: # Show last 50 created
fee_id = fee.id
# Get contract reference
contract_ref = 'N/A'
if hasattr(fee, 'line') and fee.line:
line = fee.line
if hasattr(line, 'purchase') and line.purchase:
contract = line.purchase
if hasattr(contract, 'reference') and contract.reference:
contract_ref = str(contract.reference)[:14]
product = fee.product.rec_name[:24] if hasattr(fee, 'product') and fee.product else 'N/A'
supplier = fee.supplier.rec_name[:24] if hasattr(fee, 'supplier') and fee.supplier else 'N/A'
price = f"{fee.price:.2f}" if hasattr(fee, 'price') and fee.price else 'N/A'
# Get type (payable/receivable)
fee_type = 'N/A'
if hasattr(fee, 'type'):
fee_type = fee.type
elif hasattr(fee, 'payable_receivable'):
fee_type = fee.payable_receivable
print(f"{fee_id:<8} {contract_ref:<15} {product:<25} {supplier:<25} {price:<12} {fee_type:<12}")
else:
print("No fees found")
print()
except Exception as e:
print(f"✗ Error during verification: {e}")
import traceback
traceback.print_exc()
def list_purchase_contracts():
"""List purchase contracts for debugging"""
Purchase = Model.get('purchase.purchase')
print(f"\n{'='*70}")
print("AVAILABLE PURCHASE CONTRACTS (first 20)")
print(f"{'='*70}")
contracts = Purchase.find([], order=[('id', 'DESC')], limit=20)
if contracts:
print(f"{'ID':<8} {'Reference':<20} {'Party':<30} {'State':<12}")
print("-" * 70)
for contract in contracts:
contract_id = contract.id
reference = contract.reference[:19] if contract.reference else 'N/A'
party = contract.party.rec_name[:29] if contract.party else 'N/A'
state = contract.state if contract.state else 'N/A'
print(f"{contract_id:<8} {reference:<20} {party:<30} {state:<12}")
# Show number of lines
if hasattr(contract, 'lines') and contract.lines:
print(f" Lines: {len(contract.lines)}")
else:
print("No purchase contracts found")
print(f"{'='*70}\n")
def main():
print("="*70)
print("TRYTON PURCHASE CONTRACT FEE IMPORT SCRIPT")
print("Using Proteus with XML-RPC Connection")
print("="*70)
print()
# Connect to Tryton using XML-RPC
if not connect_to_tryton():
return 1
# Optional: List purchase contracts for debugging
# Uncomment the following line to see available contracts
# list_purchase_contracts()
# Import purchase contract fees
import_purchase_contract_fees(CSV_FILE_PATH)
# Verify import
verify_import()
return 0
if __name__ == '__main__':
exit(main())

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,356 @@
import csv
from proteus import config, Model
from decimal import Decimal
# XML-RPC Configuration
HTTPS = 'https://'
SERVER_URL = 'itsa.open-squared.tech'
DATABASE_NAME = 'tradon'
USERNAME = 'admin'
PASSWORD = 'dsproject'
# CSV Configuration
CSV_FILE_PATH = r'C:\Users\SylvainDUVERNAY\Open Squared\Production - Documents\TRADON Implementation\ITSA\Reference Data\Loaders\Services.csv' # UPDATE THIS PATH!
# Product configuration
PRODUCT_TYPE = 'service' # Service type products
DEFAULT_CATEGORY = 'SERVICES' # Default category name if not found
DEFAULT_UOM = 'Mt' # Default UOM if not found
def connect_to_tryton():
"""Establish connection to Tryton via XML-RPC"""
print(f"Connecting to Tryton server: {SERVER_URL}")
print(f"Database: {DATABASE_NAME}")
print(f"Username: {USERNAME}")
try:
# Connect using XML-RPC with credentials in URL
#connection_url = f'{SERVER_URL}/{DATABASE_NAME}/'
#print(f'{USERNAME}:{PASSWORD}@{connection_url}')
config.set_xmlrpc(f'{HTTPS}{USERNAME}:{PASSWORD}@{SERVER_URL}/{DATABASE_NAME}/')
#config.set_xmlrpc('https://admin:dsproject@itsa.open-squared.tech/tradon/')
print("✓ Connected successfully!\n")
return True
except Exception as e:
print(f"✗ Connection failed: {e}")
print("\nTroubleshooting:")
print(" - Verify the server URL is correct and accessible")
print(" - Check that the Tryton server is running")
print(" - Verify username and password are correct")
print(" - Make sure you can access the server in a browser")
return False
def get_or_create_category(category_name):
"""Find or create a product category"""
Category = Model.get('product.category')
# Try to find existing category
categories = Category.find([('name', '=', category_name)])
if categories:
print(f" Found existing category: {category_name}")
return categories[0]
else:
# Create new category
new_category = Category()
new_category.name = category_name
new_category.save()
print(f" ✓ Created new category: {category_name}")
return new_category
def get_uom(uom_name):
"""Find Unit of Measure by name"""
Uom = Model.get('product.uom')
# Try exact match first
uoms = Uom.find([('name', '=', uom_name)])
if uoms:
return uoms[0]
# Try case-insensitive search by getting all and comparing
all_uoms = Uom.find([])
for uom in all_uoms:
if uom.name.lower() == uom_name.lower():
return uom
# If not found, try to get default 'Unit'
print(f" ⚠ Warning: UOM '{uom_name}' not found, using '{DEFAULT_UOM}'")
default_uoms = Uom.find([('name', '=', DEFAULT_UOM)])
if default_uoms:
return default_uoms[0]
# If even Unit is not found, get the first available
all_uoms = Uom.find([])
if all_uoms:
print(f" ⚠ Using first available UOM: {all_uoms[0].name}")
return all_uoms[0]
raise ValueError("No UOM found in database!")
def check_product_exists(code):
"""Check if product with given code already exists"""
Product = Model.get('product.product')
products = Product.find([('code', '=', code)])
return products[0] if products else None
def create_service_product(row, categories, uom):
"""Create a new service product using proteus"""
Template = Model.get('product.template')
# Create template
template = Template()
template.name = row['name']
template.code = row['code']
template.type = PRODUCT_TYPE
template.list_price = Decimal(row['sale_price']) if row['sale_price'] else Decimal('0.00')
template.cost_price_method = 'fixed' # Services use fixed cost price
template.default_uom = uom
# Link to categories (Many2Many relationship)
# Use append() instead of direct assignment
if isinstance(categories, list):
template.categories.extend(categories) # Use extend for lists
else:
template.categories.append(categories) # Use append for single category
template.salable = False # Services are not salable products by default
template.purchasable = True # Services are purchasable
if row.get('description'):
template.description = row['description']
# Save the template first
template.save()
# Now update the product that was auto-created
# When a template is created, Tryton automatically creates a default product
if template.products:
product = template.products[0]
#product.code = row['code']
product.suffix_code = row['code'] # Use suffix_code to set product code
# Set cost price on the product
product.cost_price = Decimal(row['cost_price']) if row['cost_price'] else Decimal('0.00')
product.save()
return product
else:
raise ValueError("No product was created automatically with template")
def import_services(csv_file):
"""Import services from CSV file"""
imported_count = 0
skipped_count = 0
error_count = 0
errors = []
print(f"{'='*70}")
print(f"Importing service products from: {csv_file}")
print(f"{'='*70}\n")
try:
# Open with utf-8-sig to handle BOM
with open(csv_file, 'r', encoding='utf-8-sig') as file:
reader = csv.DictReader(file)
# Debug: Show detected columns
print(f"Detected columns: {reader.fieldnames}\n")
for row_num, row in enumerate(reader, start=2):
try:
# Clean up values
code = row.get('code', '').strip()
name = row.get('name', '').strip()
category_name = row.get('category', DEFAULT_CATEGORY).strip() or DEFAULT_CATEGORY
uom_name = row.get('uom', DEFAULT_UOM).strip() or DEFAULT_UOM
sale_price = row.get('sale_price', '0.00').strip() or '0.00'
cost_price = row.get('cost_price', '0.00').strip() or '0.00'
description = row.get('description', '').strip()
# Skip empty rows
if not code and not name:
continue
# Validate required fields
if not code or not name:
errors.append(f"Row {row_num}: Missing code or name")
error_count += 1
print(f"✗ Row {row_num}: Missing required fields")
continue
print(f"Processing Row {row_num}: {code} - {name}")
# Check if product already exists
existing_product = check_product_exists(code)
if existing_product:
print(f" ⚠ Product code '{code}' already exists: {existing_product.template.name}")
print(f" Skipping...\n")
skipped_count += 1
continue
# Get or create category
category = get_or_create_category(category_name)
# Get UOM
uom = get_uom(uom_name)
print(f" Using UOM: {uom.name}")
# Create the product
row_data = {
'code': code,
'name': name,
'sale_price': sale_price,
'cost_price': cost_price,
'description': description
}
product = create_service_product(row_data, category, uom)
print(f" ✓ Created service product")
print(f" Product ID: {product.id}, Template ID: {product.template.id}")
print(f" Code: {code}")
print(f" Category: {category.name}")
print(f" Sale Price: {sale_price}")
print(f" Cost Price: {cost_price}")
if description:
print(f" Description: {description[:50]}...")
print()
imported_count += 1
except Exception as e:
error_msg = f"Row {row_num} - {code} ({name}): {str(e)}"
errors.append(error_msg)
error_count += 1
print(f"✗ Error on row {row_num}: {e}\n")
import traceback
traceback.print_exc()
# Summary
print(f"{'='*70}")
print("IMPORT SUMMARY")
print(f"{'='*70}")
print(f"Successfully imported: {imported_count} service products")
print(f"Skipped (already exist): {skipped_count} products")
print(f"Errors: {error_count}")
if errors:
print(f"\nError details:")
for error in errors:
print(f" - {error}")
print(f"\n{'='*70}")
except FileNotFoundError:
print(f"✗ Error: CSV file not found at {csv_file}")
print(f"Please update CSV_FILE_PATH in the script with the correct path.")
except Exception as e:
print(f"✗ Fatal error: {e}")
import traceback
traceback.print_exc()
def verify_import():
"""Verify imported service products"""
Product = Model.get('product.product')
print(f"\n{'='*70}")
print("VERIFICATION - Service Products")
print(f"{'='*70}\n")
# Find all service type products
products = Product.find([('template.type', '=', 'service')])
if products:
print(f"Found {len(products)} service products:\n")
print(f"{'Code':<12} {'Name':<30} {'Categories':<25} {'Sale Price':<12}")
print("-" * 85)
for product in products:
code = product.code or 'N/A'
name = product.template.name[:29] if product.template.name else 'N/A'
# Get categories (Many2Many relationship)
if product.template.categories:
categories = ', '.join([cat.name for cat in product.template.categories])
categories = categories[:24]
else:
categories = 'N/A'
sale_price = f"{product.template.list_price:.2f}" if product.template.list_price else '0.00'
print(f"{code:<12} {name:<30} {categories:<25} {sale_price:<12}")
else:
print("No service products found")
print()
def list_available_uoms():
"""List all available UOMs in the database"""
print(f"\n{'='*70}")
print("AVAILABLE UNITS OF MEASURE")
print(f"{'='*70}\n")
Uom = Model.get('product.uom')
uoms = Uom.find([])
if uoms:
print(f"Found {len(uoms)} UOMs:\n")
for uom in uoms:
symbol = f"({uom.symbol})" if hasattr(uom, 'symbol') and uom.symbol else ""
print(f" - {uom.name} {symbol}")
else:
print("No UOMs found")
print()
def list_available_categories():
"""List all available product categories"""
print(f"\n{'='*70}")
print("AVAILABLE PRODUCT CATEGORIES")
print(f"{'='*70}\n")
Category = Model.get('product.category')
categories = Category.find([])
if categories:
print(f"Found {len(categories)} categories:\n")
for cat in categories:
print(f" - {cat.name}")
else:
print("No categories found")
print()
def main():
print("="*70)
print("TRYTON SERVICE PRODUCT IMPORT SCRIPT")
print("Using Proteus with XML-RPC Connection")
print("="*70)
print()
# Connect to Tryton using XML-RPC
if not connect_to_tryton():
return 1
# Optional: List available UOMs and categories
# Uncomment these if you want to see what's available in your database
# list_available_uoms()
# list_available_categories()
# Import service products
import_services(CSV_FILE_PATH)
# Verify import
verify_import()
return 0
if __name__ == '__main__':
exit(main())

View File

@@ -0,0 +1,310 @@
import csv
from proteus import config, Model
from decimal import Decimal
# Configuration
DATABASE_NAME = 'tradon'
CSV_FILE_PATH = r'C:\Users\SylvainDUVERNAY\Open Squared\Production - Documents\TRADON Implementation\ITSA\Reference Data\Loaders\Services.csv' # UPDATE THIS PATH!
# Product configuration
PRODUCT_TYPE = 'service' # Service type products
DEFAULT_CATEGORY = 'Services' # Default category name if not found
DEFAULT_UOM = 'Mt' # Default UOM if not found
def connect_to_tryton():
"""Establish connection to Tryton database"""
print(f"Connecting to Tryton database: {DATABASE_NAME}")
try:
#config.set_trytond(DATABASE_NAME)
config.set_xmlrpc('https://admin:dsproject@itsa.open-squared.tech/tradon/')
print("✓ Connected successfully!\n")
return True
except Exception as e:
print(f"✗ Connection failed: {e}")
return False
def get_or_create_category(category_name):
"""Find or create a product category"""
Category = Model.get('product.category')
# Try to find existing category
categories = Category.find([('name', '=', category_name)])
if categories:
print(f" Found existing category: {category_name}")
return categories[0]
else:
# Create new category
new_category = Category()
new_category.name = category_name
new_category.save()
print(f" ✓ Created new category: {category_name}")
return new_category
def get_uom(uom_name):
"""Find Unit of Measure by name"""
Uom = Model.get('product.uom')
# Try exact match first
uoms = Uom.find([('name', '=', uom_name)])
if uoms:
return uoms[0]
# Try case-insensitive search
all_uoms = Uom.find([])
for uom in all_uoms:
if uom.name.lower() == uom_name.lower():
return uom
# If not found, return Unit (default)
print(f" ⚠ Warning: UOM '{uom_name}' not found, using 'Unit'")
default_uoms = Uom.find([('name', '=', 'Unit')])
if default_uoms:
return default_uoms[0]
# If even Unit is not found, get the first available
all_uoms = Uom.find([])
if all_uoms:
print(f" ⚠ Using first available UOM: {all_uoms[0].name}")
return all_uoms[0]
raise ValueError("No UOM found in database!")
def check_product_exists(code):
"""Check if product with given code already exists"""
Product = Model.get('product.product')
products = Product.find([('code', '=', code)])
return products[0] if products else None
def create_service_product(row, category, uom):
"""Create a new service product"""
Product = Model.get('product.product')
Template = Model.get('product.template')
# Create template first
template = Template()
template.name = row['name']
template.type = PRODUCT_TYPE
template.list_price = Decimal(row['sale_price']) if row['sale_price'] else Decimal('0.00')
template.cost_price = Decimal(row['cost_price']) if row['cost_price'] else Decimal('0.00')
template.default_uom = uom
template.category = category
template.salable = True
template.purchasable = False # Services typically not purchased
if row.get('description'):
template.description = row['description']
template.save()
# Create product variant
product = Product()
product.template = template
product.code = row['code']
product.save()
return product
def import_services(csv_file):
"""Import services from CSV file"""
Product = Model.get('product.product')
imported_count = 0
skipped_count = 0
error_count = 0
errors = []
print(f"{'='*70}")
print(f"Importing service products from: {csv_file}")
print(f"{'='*70}\n")
try:
# Open with utf-8-sig to handle BOM
with open(csv_file, 'r', encoding='utf-8-sig') as file:
reader = csv.DictReader(file)
# Debug: Show detected columns
print(f"Detected columns: {reader.fieldnames}\n")
for row_num, row in enumerate(reader, start=2):
try:
# Clean up values
code = row.get('code', '').strip()
name = row.get('name', '').strip()
category_name = row.get('category', DEFAULT_CATEGORY).strip() or DEFAULT_CATEGORY
uom_name = row.get('uom', DEFAULT_UOM).strip() or DEFAULT_UOM
sale_price = row.get('sale_price', '0.00').strip()
cost_price = row.get('cost_price', '0.00').strip()
description = row.get('description', '').strip()
# Skip empty rows
if not code and not name:
continue
# Validate required fields
if not code or not name:
errors.append(f"Row {row_num}: Missing code or name")
error_count += 1
print(f"✗ Row {row_num}: Missing required fields")
continue
print(f"Processing Row {row_num}: {code} - {name}")
# Check if product already exists
existing_product = check_product_exists(code)
if existing_product:
print(f" ⚠ Product code '{code}' already exists: {existing_product.template.name}")
print(f" Skipping...")
skipped_count += 1
continue
# Get or create category
category = get_or_create_category(category_name)
# Get UOM
uom = get_uom(uom_name)
print(f" Using UOM: {uom.name}")
# Create the product
row_data = {
'code': code,
'name': name,
'sale_price': sale_price,
'cost_price': cost_price,
'description': description
}
product = create_service_product(row_data, category, uom)
print(f" ✓ Created service product: {name}")
print(f" Code: {code}")
print(f" Category: {category.name}")
print(f" Sale Price: {sale_price}")
print(f" Cost Price: {cost_price}")
if description:
print(f" Description: {description[:50]}...")
print()
imported_count += 1
except Exception as e:
error_msg = f"Row {row_num} - {code} ({name}): {str(e)}"
errors.append(error_msg)
error_count += 1
print(f"✗ Error on row {row_num}: {e}\n")
# Summary
print(f"{'='*70}")
print("IMPORT SUMMARY")
print(f"{'='*70}")
print(f"Successfully imported: {imported_count} service products")
print(f"Skipped (already exist): {skipped_count} products")
print(f"Errors: {error_count}")
if errors:
print(f"\nError details:")
for error in errors:
print(f" - {error}")
print(f"\n{'='*70}")
except FileNotFoundError:
print(f"✗ Error: CSV file not found at {csv_file}")
print(f"Please update CSV_FILE_PATH in the script with the correct path.")
except Exception as e:
print(f"✗ Fatal error: {e}")
import traceback
traceback.print_exc()
def verify_import():
"""Verify imported service products"""
Product = Model.get('product.product')
print(f"\n{'='*70}")
print("VERIFICATION - Service Products")
print(f"{'='*70}\n")
# Find all service type products
products = Product.find([('template.type', '=', 'service')])
if products:
print(f"Found {len(products)} service products:\n")
print(f"{'Code':<12} {'Name':<35} {'Category':<20} {'Sale Price':<12}")
print("-" * 80)
for product in products:
code = product.code or 'N/A'
name = product.template.name[:34] if product.template.name else 'N/A'
category = product.template.category.name if product.template.category else 'N/A'
sale_price = f"{product.template.list_price:.2f}" if product.template.list_price else '0.00'
print(f"{code:<12} {name:<35} {category:<20} {sale_price:<12}")
else:
print("No service products found")
print()
def list_available_uoms():
"""List all available UOMs in the database"""
print(f"\n{'='*70}")
print("AVAILABLE UNITS OF MEASURE")
print(f"{'='*70}\n")
Uom = Model.get('product.uom')
uoms = Uom.find([])
if uoms:
print(f"Found {len(uoms)} UOMs:\n")
for uom in uoms:
print(f" - {uom.name} (Symbol: {uom.symbol if hasattr(uom, 'symbol') else 'N/A'})")
else:
print("No UOMs found")
print()
def list_available_categories():
"""List all available product categories"""
print(f"\n{'='*70}")
print("AVAILABLE PRODUCT CATEGORIES")
print(f"{'='*70}\n")
Category = Model.get('product.category')
categories = Category.find([])
if categories:
print(f"Found {len(categories)} categories:\n")
for cat in categories:
print(f" - {cat.name}")
else:
print("No categories found")
print()
def main():
print("="*70)
print("TRYTON SERVICE PRODUCT IMPORT SCRIPT (using Proteus)")
print("="*70)
print()
# Connect to Tryton
if not connect_to_tryton():
return 1
# Optional: List available UOMs and categories
# Uncomment these if you want to see what's available in your database
# list_available_uoms()
# list_available_categories()
# Import service products
import_services(CSV_FILE_PATH)
# Verify import
verify_import()
return 0
if __name__ == '__main__':
exit(main())

View File

@@ -0,0 +1,397 @@
import csv
import psycopg2
from proteus import config, Model
# CSV Configuration
CSV_FILE_PATH = r'C:\Users\SylvainDUVERNAY\Open Squared\Production - Documents\TRADON Implementation\ITSA\Reference Data\Loaders\Customer_Stock_Locations.csv'
# XML-RPC Configuration
HTTPS = 'https://'
SERVER_URL = 'itsa.open-squared.tech'
DATABASE_NAME = 'tradon'
USERNAME = 'admin'
PASSWORD = 'dsproject'
# PostgreSQL Configuration (for direct database access)
DB_HOST = '72.61.163.139'
DB_PORT = 5433
DB_USER = 'postgres'
DB_PASSWORD = 'dsproject'
# Default values
DEFAULT_TYPE = 'storage' # Default location type if not specified
def connect_to_tryton():
"""Establish connection to Tryton via XML-RPC"""
print(f"Connecting to Tryton server: {SERVER_URL}")
print(f"Database: {DATABASE_NAME}")
print(f"Username: {USERNAME}")
try:
config.set_xmlrpc(f'{HTTPS}{USERNAME}:{PASSWORD}@{SERVER_URL}/{DATABASE_NAME}/')
print("✓ Connected successfully!\n")
return True
except Exception as e:
print(f"✗ Connection failed: {e}")
print("\nTroubleshooting:")
print(" - Verify the server URL is correct and accessible")
print(" - Check that the Tryton server is running")
print(" - Verify username and password are correct")
print(" - Make sure you can access the server in a browser")
return False
def get_db_connection():
"""Get PostgreSQL database connection"""
try:
conn = psycopg2.connect(
host=DB_HOST,
port=DB_PORT,
database=DATABASE_NAME,
user=DB_USER,
password=DB_PASSWORD
)
return conn
except Exception as e:
print(f"✗ Database connection failed: {e}")
return None
def update_location_coordinates(location_id, latitude, longitude):
"""Update location coordinates directly in PostgreSQL"""
conn = get_db_connection()
if not conn:
print(f" ⚠ Could not update coordinates - database connection failed")
return False
try:
cursor = conn.cursor()
# Update lat and lon columns
update_query = """
UPDATE stock_location
SET lat = %s, lon = %s
WHERE id = %s
"""
cursor.execute(update_query, (latitude, longitude, location_id))
rows_affected = cursor.rowcount
conn.commit()
cursor.close()
conn.close()
return rows_affected > 0
except Exception as e:
print(f" ⚠ Error updating coordinates: {e}")
import traceback
traceback.print_exc()
if conn:
conn.rollback()
conn.close()
return False
def check_location_exists_by_name(name):
"""Check if location with given name already exists"""
Location = Model.get('stock.location')
locations = Location.find([('name', '=', name)])
return locations[0] if locations else None
def validate_location_type(loc_type):
"""Validate location type"""
valid_types = [
'supplier', 'customer', 'lost_found', 'warehouse',
'storage', 'production', 'drop', 'rental', 'view'
]
if not loc_type or loc_type.lower() not in valid_types:
print(f" ⚠ Warning: Invalid type '{loc_type}', using default '{DEFAULT_TYPE}'")
return DEFAULT_TYPE
return loc_type.lower()
def parse_coordinate(value, coord_name):
"""Parse and validate coordinate value"""
if not value or value == '':
return None
# Handle 'NULL' or similar string values
if isinstance(value, str) and value.strip().upper() in ['NULL', 'NONE', 'N/A', '']:
return None
try:
coord = float(value)
# Validate latitude range (-90 to 90)
if coord_name == 'latitude':
if coord < -90 or coord > 90:
print(f" ⚠ Warning: Latitude {coord} out of range (-90 to 90)")
return None
# Validate longitude range (-180 to 180)
if coord_name == 'longitude':
if coord < -180 or coord > 180:
print(f" ⚠ Warning: Longitude {coord} out of range (-180 to 180)")
return None
return coord
except (ValueError, TypeError) as e:
print(f" ⚠ Warning: Invalid {coord_name} value '{value}' - {e}")
return None
def create_location(row):
"""Create a new location using proteus"""
Location = Model.get('stock.location')
# Create location
location = Location()
location.name = row['name']
location.type = row['type']
# Save the location first
location.save()
# Get coordinates and save them
latitude = row.get('latitude')
longitude = row.get('longitude')
# Update coordinates directly in database if provided
if latitude is not None or longitude is not None:
success = update_location_coordinates(location.id, latitude, longitude)
if not success:
print(f" ⚠ Location created but coordinates not saved")
return location, latitude, longitude
def import_locations(csv_file):
"""Import locations from CSV file"""
imported_count = 0
skipped_count = 0
error_count = 0
errors = []
# Track names we've already processed in this run
processed_names = set()
print(f"{'='*70}")
print(f"Importing locations from: {csv_file}")
print(f"{'='*70}\n")
try:
# Open with utf-8-sig to handle BOM
with open(csv_file, 'r', encoding='utf-8-sig') as file:
reader = csv.DictReader(file)
# Debug: Show detected columns
print(f"Detected columns: {reader.fieldnames}\n")
for row_num, row in enumerate(reader, start=2):
try:
# Clean up values - get directly from CSV columns
name = row.get('name', '').strip()
loc_type = row.get('type', '').strip() or DEFAULT_TYPE
lat_raw = row.get('lat', '').strip()
lon_raw = row.get('lon', '').strip()
# Skip empty rows
if not name:
continue
print(f"Processing Row {row_num}: {name}")
print(f" CSV Raw values - lat: '{lat_raw}', lon: '{lon_raw}'")
# Check if we've already processed this name in this import run
if name in processed_names:
print(f" ⚠ Duplicate name in CSV: '{name}'")
print(f" Skipping duplicate entry...\n")
skipped_count += 1
continue
# Check if location already exists in database
existing_location = check_location_exists_by_name(name)
if existing_location:
print(f" ⚠ Location '{name}' already exists (ID: {existing_location.id})")
print(f" Type: {existing_location.type}")
print(f" Skipping...\n")
skipped_count += 1
processed_names.add(name)
continue
# Validate location type
loc_type = validate_location_type(loc_type)
# Parse coordinates
latitude = parse_coordinate(lat_raw, 'latitude')
longitude = parse_coordinate(lon_raw, 'longitude')
print(f" Parsed values - lat: {latitude}, lon: {longitude}")
# Create the location with parsed data
location_data = {
'name': name,
'type': loc_type,
'latitude': latitude,
'longitude': longitude
}
location, saved_lat, saved_lon = create_location(location_data)
# Mark this name as processed
processed_names.add(name)
print(f" ✓ Created location")
print(f" Location ID: {location.id}")
print(f" Name: {name}")
print(f" Type: {loc_type}")
if saved_lat is not None:
print(f" Latitude: {saved_lat}")
if saved_lon is not None:
print(f" Longitude: {saved_lon}")
print()
imported_count += 1
except Exception as e:
error_msg = f"Row {row_num} - {name if 'name' in locals() else 'Unknown'}: {str(e)}"
errors.append(error_msg)
error_count += 1
print(f"✗ Error on row {row_num}: {e}\n")
import traceback
traceback.print_exc()
# Summary
print(f"{'='*70}")
print("IMPORT SUMMARY")
print(f"{'='*70}")
print(f"Successfully imported: {imported_count} locations")
print(f"Skipped (already exist or duplicates): {skipped_count} locations")
print(f"Errors: {error_count}")
if errors:
print(f"\nError details:")
for error in errors:
print(f" - {error}")
print(f"\n{'='*70}")
except FileNotFoundError:
print(f"✗ Error: CSV file not found at {csv_file}")
print(f"Please update CSV_FILE_PATH in the script with the correct path.")
except Exception as e:
print(f"✗ Fatal error: {e}")
import traceback
traceback.print_exc()
def verify_import():
"""Verify imported locations with coordinates from database"""
Location = Model.get('stock.location')
print(f"\n{'='*70}")
print("VERIFICATION - Stock Locations")
print(f"{'='*70}\n")
# Get database connection to read coordinates
conn = get_db_connection()
if not conn:
print("Cannot verify - database connection failed")
return
# Find all locations (or limit to recently created ones)
locations = Location.find([], order=[('id', 'DESC')])
if locations:
print(f"Found {len(locations)} locations (showing last 20):\n")
print(f"{'ID':<8} {'Name':<35} {'Type':<12} {'Lat':<12} {'Lon':<12}")
print("-" * 85)
for location in locations[:20]: # Show last 20 created
loc_id = location.id
name = location.name[:34] if location.name else 'N/A'
loc_type = location.type if location.type else 'N/A'
# Get coordinates from database
lat = 'N/A'
lon = 'N/A'
try:
cursor = conn.cursor()
cursor.execute(
"SELECT lat, lon FROM stock_location WHERE id = %s",
(loc_id,)
)
result = cursor.fetchone()
if result:
lat = f"{result[0]:.6f}" if result[0] is not None else 'N/A'
lon = f"{result[1]:.6f}" if result[1] is not None else 'N/A'
cursor.close()
except Exception as e:
print(f"Error reading coordinates for location {loc_id}: {e}")
print(f"{loc_id:<8} {name:<35} {loc_type:<12} {lat:<12} {lon:<12}")
conn.close()
else:
print("No locations found")
print()
def main():
print("="*70)
print("TRYTON STOCK LOCATION IMPORT SCRIPT")
print("Using Proteus with XML-RPC Connection")
print("Using Direct PostgreSQL for lat/lon coordinates")
print("="*70)
print()
# Connect to Tryton using XML-RPC
if not connect_to_tryton():
return 1
# Test database connection
print("Testing PostgreSQL connection...")
conn = get_db_connection()
if conn:
print("✓ PostgreSQL connection successful")
# Test if lat/lon columns exist
try:
cursor = conn.cursor()
cursor.execute("""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = 'stock_location'
AND column_name IN ('lat', 'lon')
""")
columns = cursor.fetchall()
cursor.close()
if columns:
print("✓ Found lat/lon columns in stock_location table:")
for col in columns:
print(f" - {col[0]}: {col[1]}")
else:
print("✗ WARNING: lat/lon columns NOT found in stock_location table!")
print(" Coordinates will not be saved!")
except Exception as e:
print(f" Could not verify columns: {e}")
conn.close()
print()
else:
print("✗ PostgreSQL connection failed")
print("Coordinates will not be saved!\n")
return 1
# Import locations
import_locations(CSV_FILE_PATH)
# Verify import
verify_import()
return 0
if __name__ == '__main__':
exit(main())

View File

@@ -0,0 +1,165 @@
import csv
import psycopg2
from datetime import datetime
# Database connection parameters
DB_CONFIG = {
'host': '72.61.163.139',
'port': 5433,
'database': 'tradon',
'user': 'postgres',
'password': 'dsproject'
}
# CSV file path
CSV_FILE = r'C:\Users\SylvainDUVERNAY\Open Squared\Production - Documents\TRADON Implementation\ITSA\Reference Data\Loaders\Vessels.csv'
def import_vessels():
"""Import vessel data from CSV into trade_vessel table"""
print("=" * 60)
print("VESSEL IMPORT PROCESS STARTED")
print("=" * 60)
# Initialize connection and cursor objects
conn = None
cursor = None
try:
# Connect to PostgreSQL database
print(f"\n[1/4] Connecting to database...")
print(f" Host: {DB_CONFIG['host']}:{DB_CONFIG['port']}")
print(f" Database: {DB_CONFIG['database']}")
conn = psycopg2.connect(**DB_CONFIG)
cursor = conn.cursor()
print(" ✓ Database connection established")
# Read CSV file with UTF-8-BOM encoding to handle Excel-generated CSVs
print(f"\n[2/4] Reading CSV file...")
print(f" File: {CSV_FILE}")
with open(CSV_FILE, 'r', encoding='utf-8-sig') as file:
csv_reader = csv.DictReader(file)
# Initialize counters for tracking import results
insert_count = 0
skip_count = 0
print(" ✓ CSV file opened successfully")
print(f"\n[3/4] Processing vessel records...")
print("-" * 60)
# Process each row from CSV file
for row_num, row in enumerate(csv_reader, start=1):
# Extract and clean vessel data from CSV row
vessel_name = row['vessel_name'].strip()
# Convert empty strings to None for vessel_year
vessel_year = row['vessel_year'].strip() if row['vessel_year'].strip() else None
# Convert empty strings and 'NULL' text to None for vessel_imo
vessel_imo = row['vessel_imo'].strip() if row['vessel_imo'].strip() and row['vessel_imo'].upper() != 'NULL' else None
print(f"\nRow {row_num}: Processing '{vessel_name}'")
print(f" Year: {vessel_year if vessel_year else 'N/A'}")
print(f" IMO: {vessel_imo if vessel_imo else 'N/A'}")
# Check if vessel already exists in database to avoid duplicates
cursor.execute("""
SELECT id FROM trade_vessel
WHERE vessel_name = %s AND vessel_imo = %s
""", (vessel_name, vessel_imo))
existing = cursor.fetchone()
# Skip insertion if vessel already exists
if existing:
print(f" ⚠ SKIPPED - Duplicate found (ID: {existing[0]})")
skip_count += 1
continue
# Insert new vessel record into trade_vessel table
cursor.execute("""
INSERT INTO trade_vessel
(vessel_name, vessel_year, vessel_imo, active, create_date, create_uid, write_date, write_uid)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""", (
vessel_name, # Vessel name from CSV
vessel_year, # Year vessel was built
vessel_imo, # IMO number (international maritime identifier)
True, # Set active flag to True
datetime.now(), # Record creation timestamp
1, # User ID who created the record
datetime.now(), # Record last modification timestamp
1 # User ID who last modified the record
))
# Get the ID of the newly inserted record
new_id = cursor.fetchone()[0]
# Increment insert counter and log success
insert_count += 1
print(f" ✓ INSERTED successfully (New ID: {new_id})")
print("-" * 60)
# Commit all inserts to database
print(f"\n[4/4] Committing transaction to database...")
conn.commit()
print(" ✓ Transaction committed successfully")
# Display import summary statistics
print("\n" + "=" * 60)
print("IMPORT SUMMARY")
print("=" * 60)
print(f"✓ Records inserted: {insert_count}")
print(f"⚠ Records skipped: {skip_count}")
print(f" Total processed: {insert_count + skip_count}")
print("=" * 60)
except psycopg2.Error as e:
# Rollback transaction if database error occurs
print("\n" + "!" * 60)
print("DATABASE ERROR")
print("!" * 60)
if conn:
conn.rollback()
print("✓ Transaction rolled back")
print(f"Error details: {e}")
print("!" * 60)
except FileNotFoundError:
# Handle case where CSV file doesn't exist
print("\n" + "!" * 60)
print("FILE NOT FOUND ERROR")
print("!" * 60)
print(f"CSV file not found: {CSV_FILE}")
print("Please check the file path and try again.")
print("!" * 60)
except Exception as e:
# Catch any other unexpected errors and rollback
print("\n" + "!" * 60)
print("UNEXPECTED ERROR")
print("!" * 60)
if conn:
conn.rollback()
print("✓ Transaction rolled back")
print(f"Error details: {e}")
print("!" * 60)
finally:
# Clean up database resources
print(f"\n[CLEANUP] Closing database connection...")
if cursor:
cursor.close()
print(" ✓ Cursor closed")
if conn:
conn.close()
print(" ✓ Connection closed")
print("\n" + "=" * 60)
print("VESSEL IMPORT PROCESS COMPLETED")
print("=" * 60 + "\n")
# Execute import when script is run directly
if __name__ == "__main__":
import_vessels()