# helpers/migration_mapping.py from datetime import date import psycopg2 from psycopg2.extras import execute_values import logging logger = logging.getLogger(__name__) class MigrationMapper: """Handle migration mapping records between source system and Tryton""" def __init__(self, db_config): """ Initialize with database configuration Args: db_config: dict with keys: host, port, database, user, password """ self.db_config = db_config self.connection = None def __enter__(self): """Context manager entry - establish database connection""" self.connection = psycopg2.connect(**self.db_config) return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit - close database connection""" if self.connection: if exc_type is None: self.connection.commit() else: self.connection.rollback() self.connection.close() def save_mapping(self, object_type, source_id, tryton_model, tryton_id, recon_key): """ Save a single migration mapping record Args: object_type: Type of object being migrated (e.g., 'sale_contract') source_id: ID from source system tryton_model: Tryton model name (e.g., 'sale.sale') tryton_id: Tryton record ID recon_key: Reconciliation key for matching (e.g., contract number) """ mappings = [{ 'object_type': object_type, 'source_id': source_id, 'tryton_model': tryton_model, 'tryton_id': tryton_id, 'recon_key': recon_key }] self.save_mappings_batch(mappings) def save_mappings_batch(self, mappings): """ Save multiple migration mapping records in batch Args: mappings: List of dicts with keys: - object_type: Type of object being migrated - source_id: ID from source system - tryton_model: Tryton model name - tryton_id: Tryton record ID - recon_key: Reconciliation key for matching """ if not mappings: logger.warning("No mappings to save") return cursor = self.connection.cursor() # Prepare data for batch insert values = [ ( [mapping['object_type']], # Array with single element [mapping['source_id']], # Array with single element [mapping['tryton_model']], # Array with single element mapping['tryton_id'], [mapping['recon_key']], # Array with single element date.today() ) for mapping in mappings ] try: # Use execute_values for efficient batch insert # ON CONFLICT DO NOTHING prevents duplicates execute_values( cursor, """ INSERT INTO public.os_migration_mapping (object_type, source_id, tryton_model, tryton_id, recon_key, write_date) VALUES %s ON CONFLICT DO NOTHING """, values, template="(%s, %s, %s, %s, %s, %s)" ) logger.info(f"Saved {len(mappings)} migration mapping records") except Exception as e: logger.error(f"Error saving migration mappings: {e}") raise finally: cursor.close() def get_tryton_id(self, object_type, source_id): """ Retrieve Tryton ID for a given source system ID Args: object_type: Type of object source_id: ID from source system Returns: int: Tryton ID or None if not found """ cursor = self.connection.cursor() try: cursor.execute( """ SELECT tryton_id FROM public.os_migration_mapping WHERE %s = ANY(object_type) AND %s = ANY(source_id) ORDER BY write_date DESC LIMIT 1 """, (object_type, source_id) ) result = cursor.fetchone() return result[0] if result else None finally: cursor.close() def get_source_id(self, object_type, tryton_id): """ Retrieve source system ID for a given Tryton ID Args: object_type: Type of object tryton_id: Tryton record ID Returns: str: Source ID or None if not found """ cursor = self.connection.cursor() try: cursor.execute( """ SELECT source_id[1] FROM public.os_migration_mapping WHERE %s = ANY(object_type) AND tryton_id = %s ORDER BY write_date DESC LIMIT 1 """, (object_type, tryton_id) ) result = cursor.fetchone() return result[0] if result else None finally: cursor.close() def get_mapping_by_recon_key(self, object_type, recon_key): """ Retrieve mapping by reconciliation key Args: object_type: Type of object recon_key: Reconciliation key Returns: dict: Mapping record or None if not found """ cursor = self.connection.cursor() try: cursor.execute( """ SELECT object_type[1] as object_type, source_id[1] as source_id, tryton_model[1] as tryton_model, tryton_id, recon_key[1] as recon_key, write_date FROM public.os_migration_mapping WHERE %s = ANY(object_type) AND %s = ANY(recon_key) ORDER BY write_date DESC LIMIT 1 """, (object_type, recon_key) ) result = cursor.fetchone() if result: return { 'object_type': result[0], 'source_id': result[1], 'tryton_model': result[2], 'tryton_id': result[3], 'recon_key': result[4], 'write_date': result[5] } return None finally: cursor.close() def delete_mappings_by_source(self, object_type, source_ids): """ Delete mappings by source IDs (useful for re-import) Args: object_type: Type of object source_ids: List of source IDs to delete Returns: int: Number of records deleted """ if not source_ids: return 0 cursor = self.connection.cursor() try: # Delete records where source_id matches any in the list cursor.execute( """ DELETE FROM public.os_migration_mapping WHERE %s = ANY(object_type) AND source_id[1] = ANY(%s) """, (object_type, source_ids) ) deleted_count = cursor.rowcount logger.info(f"Deleted {deleted_count} migration mapping records") return deleted_count finally: cursor.close() def get_all_mappings(self, object_type=None): """ Retrieve all mappings, optionally filtered by object type Args: object_type: Optional object type filter Returns: list: List of mapping dicts """ cursor = self.connection.cursor() try: if object_type: cursor.execute( """ SELECT object_type[1] as object_type, source_id[1] as source_id, tryton_model[1] as tryton_model, tryton_id, recon_key[1] as recon_key, write_date FROM public.os_migration_mapping WHERE %s = ANY(object_type) ORDER BY write_date DESC """, (object_type,) ) else: cursor.execute( """ SELECT object_type[1] as object_type, source_id[1] as source_id, tryton_model[1] as tryton_model, tryton_id, recon_key[1] as recon_key, write_date FROM public.os_migration_mapping ORDER BY write_date DESC """ ) results = cursor.fetchall() return [ { 'object_type': row[0], 'source_id': row[1], 'tryton_model': row[2], 'tryton_id': row[3], 'recon_key': row[4], 'write_date': row[5] } for row in results ] finally: cursor.close() # Simplified standalone function for quick integration def save_migration_mapping(db_config, object_type, source_id, tryton_model, tryton_id, recon_key): """ Standalone function to save a single migration mapping Args: db_config: dict with database connection parameters object_type: Type of object being migrated source_id: ID from source system tryton_model: Tryton model name tryton_id: Tryton record ID recon_key: Reconciliation key for matching """ with MigrationMapper(db_config) as mapper: mapper.save_mapping(object_type, source_id, tryton_model, tryton_id, recon_key)