""" Script di importazione CSV → MySQL per NETGESCON. - Configurazione letta da agent_config.json (stile .env di Laravel) - Aggiornamento automatico schema tabelle - Update solo se i dati cambiano (hash MD5) - Logging dettagliato - Gestione PK (chiavi primarie) opzionale - Codice commentato e scalabile Dipendenze: pip install pandas mysql-connector-python python-dotenv """ import os import json import pandas as pd import mysql.connector import hashlib import logging from datetime import datetime # --- CONFIGURAZIONE --- def load_config(config_path): """Carica la configurazione da agent_config.json""" with open(config_path, encoding="utf-8") as f: return json.load(f) def get_mysql_connection(cfg): """Restituisce una connessione MySQL usando la config""" return mysql.connector.connect( host=cfg["MySQLHost"], port=cfg["MySQLPort"], user=cfg["MySQLUser"], password=cfg["MySQLPassword"], database=cfg["MySQLDatabase"] ) # --- LOGGING --- def setup_logging(log_dir, log_file): """Imposta il logging su file e console""" os.makedirs(log_dir, exist_ok=True) log_path = os.path.join(log_dir, log_file) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler(log_path, encoding="utf-8"), logging.StreamHandler() ] ) # --- UTILS --- def infer_sql_type(val): """Inferisce il tipo SQL da un valore di esempio""" try: int(val) return "INT" except: try: float(val) return "FLOAT" except: if isinstance(val, str) and len(val) > 255: return "TEXT" return "VARCHAR(255)" def get_existing_columns(cursor, table): """Restituisce la lista delle colonne esistenti per una tabella""" cursor.execute(f"SHOW COLUMNS FROM `{table}`") return [row[0] for row in cursor.fetchall()] def get_table_pk(table): """ Restituisce la chiave primaria per la tabella. Puoi estendere questa funzione per mappare le PK delle tue tabelle. """ pk_map = { "Stabili": ["id_stabile"], "Fornitori": ["id_fornitore"], "condomin": ["id_cond"], "rate": ["id_rate"], "incassi": ["ID_incasso"], # ...aggiungi qui altre tabelle e le loro PK... } return pk_map.get(table, []) def hash_row(row, fields): """Calcola l'hash MD5 di una riga (solo sui campi dati)""" s = "|".join(str(row[f]) for f in fields) return hashlib.md5(s.encode("utf-8")).hexdigest() def table_exists(cursor, table): cursor.execute(f"SHOW TABLES LIKE '{table}'") return cursor.fetchone() is not None # --- IMPORTAZIONE --- def create_or_update_table(cursor, table, df): """ Crea la tabella se non esiste, oppure aggiunge nuove colonne se necessario. """ columns = [] for col in df.columns: sample_val = df[col].dropna().iloc[0] if not df[col].dropna().empty else "" columns.append((col, infer_sql_type(sample_val))) if not table_exists(cursor, table): # Crea tabella con PK se definita pk = get_table_pk(table) fields = ", ".join([f"`{c}` {t}" for c, t in columns]) pk_sql = f", PRIMARY KEY ({', '.join([f'`{k}`' for k in pk])})" if pk else "" sql = f"CREATE TABLE `{table}` ({fields}{pk_sql}) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4" cursor.execute(sql) logging.info(f"Tabella '{table}' creata.") else: # Aggiorna tabella se ci sono nuovi campi existing = get_existing_columns(cursor, table) for c, t in columns: if c not in existing: cursor.execute(f"ALTER TABLE `{table}` ADD COLUMN `{c}` {t}") logging.info(f"Tabella '{table}' aggiornata: aggiunta colonna '{c}'.") def import_csv_to_mysql(cfg, csv_path, table, conn, hash_dir): """ Importa un CSV in una tabella MySQL, aggiornando solo se i dati cambiano. """ try: df = pd.read_csv(csv_path, dtype=str, encoding=cfg.get("CSVEncoding", "utf-8")) df = df.fillna("") cursor = conn.cursor() create_or_update_table(cursor, table, df) pk = get_table_pk(table) # Calcola hash del file per evitare update inutili hash_file = os.path.join(hash_dir, f"{table}.md5") file_hash = hashlib.md5(open(csv_path, "rb").read()).hexdigest() if os.path.exists(hash_file): with open(hash_file, "r") as f: last_hash = f.read().strip() if last_hash == file_hash: logging.info(f"{table}: dati invariati, nessun update.") cursor.close() return # Inserimento/aggiornamento dati cols = ", ".join([f"`{c}`" for c in df.columns]) vals = ", ".join(["%s"] * len(df.columns)) if pk: # Update se PK esiste, altrimenti insert update_sql = ", ".join([f"`{c}`=VALUES(`{c}`)" for c in df.columns if c not in pk]) sql = f"INSERT INTO `{table}` ({cols}) VALUES ({vals}) ON DUPLICATE KEY UPDATE {update_sql}" else: sql = f"REPLACE INTO `{table}` ({cols}) VALUES ({vals})" for row in df.itertuples(index=False, name=None): cursor.execute(sql, row) conn.commit() # Salva hash os.makedirs(hash_dir, exist_ok=True) with open(hash_file, "w") as f: f.write(file_hash) logging.info(f"{table}: dati importati/aggiornati ({len(df)} righe).") cursor.close() except Exception as e: logging.error(f"Errore importazione {table}: {e}") def scan_and_import(cfg): """ Scansiona ricorsivamente la InputDirectory e importa tutti i CSV in MySQL. """ input_dir = cfg["InputDirectory"] hash_dir = cfg.get("HashDirectory", "./hash") conn = get_mysql_connection(cfg) for root, dirs, files in os.walk(input_dir): for file in files: if file.lower().endswith(".csv"): csv_path = os.path.join(root, file) table = os.path.splitext(file)[0] logging.info(f"Import {csv_path} -> {table}") import_csv_to_mysql(cfg, csv_path, table, conn, hash_dir) conn.close() # --- MAIN --- if __name__ == "__main__": # Percorso relativo al file di config (stile Laravel .env) config_path = os.path.join(os.path.dirname(__file__), "agent_config.json") cfg = load_config(config_path) setup_logging(cfg["LogDirectory"], cfg.get("LogFile", "agent.log")) logging.info("=== Avvio importazione batch NETGESCON ===") scan_and_import(cfg) logging.info("=== Fine importazione batch NETGESCON ===")