""" Script di importazione CSV → MySQL per NETGESCON. - Configurazione letta da agent_config.json (stile .env di Laravel) - Aggiornamento automatico schema tabelle - Update solo se i dati cambiano (hash MD5) - Logging dettagliato - Gestione PK (chiavi primarie) opzionale - Controllo relazioni tra tabelle chiave - Generazione report struttura archivi - Codice commentato e scalabile Dipendenze: pip install pandas mysql-connector-python """ import os import json import pandas as pd import mysql.connector import hashlib import logging from logging.handlers import RotatingFileHandler import requests # --- CONFIGURAZIONE --- def load_config(config_path): """Carica la configurazione da agent_config.json""" with open(config_path, encoding="utf-8") as f: return json.load(f) def get_mysql_connection(cfg): """Restituisce una connessione MySQL usando la config""" return mysql.connector.connect( host=cfg["MySQLHost"], port=cfg["MySQLPort"], user=cfg["MySQLUser"], password=cfg["MySQLPassword"], database=cfg["MySQLDatabase"] ) # --- LOGGING --- class ErrorFilter(logging.Filter): def filter(self, record): return record.levelno >= logging.WARNING def setup_logging(log_dir, log_file): """Imposta il logging su file, console e file separato per errori/warning""" os.makedirs(log_dir, exist_ok=True) log_path = os.path.join(log_dir, log_file) error_log_path = os.path.join(log_dir, "agent_errors.log") # Handler per tutti i log file_handler = RotatingFileHandler(log_path, maxBytes=5*1024*1024, backupCount=3, encoding="utf-8") file_handler.setLevel(logging.INFO) # Handler per errori/warning error_handler = RotatingFileHandler(error_log_path, maxBytes=1*1024*1024, backupCount=3, encoding="utf-8") error_handler.setLevel(logging.WARNING) error_handler.addFilter(ErrorFilter()) # Handler console console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[file_handler, error_handler, console_handler] ) # --- UTILS --- def infer_sql_type(val): """Inferisce il tipo SQL da un valore di esempio""" try: int(val) return "INT" except: try: float(val) return "FLOAT" except: if isinstance(val, str) and len(val) > 255: return "TEXT" return "VARCHAR(255)" def get_existing_columns(cursor, table): """Restituisce la lista delle colonne esistenti per una tabella""" cursor.execute(f"SHOW COLUMNS FROM `{table}`") return [row[0] for row in cursor.fetchall()] def get_table_pk(table): """ Restituisce la chiave primaria per la tabella. Puoi estendere questa funzione per mappare le PK delle tue tabelle. """ pk_map = { "stabili": ["id_stabile"], "fornitori": ["id_fornitore"], "condomin": ["id_cond"], "rate": ["id_rate"], "incassi": ["ID_incasso"], "emes_det": ["n_emissione", "anno_emissione", "n_ricevuta"], # esempio, personalizza secondo i tuoi dati # ...aggiungi qui altre tabelle e le loro PK... } return pk_map.get(table.lower(), []) def table_exists(cursor, table): cursor.execute(f"SHOW TABLES LIKE '{table}'") return cursor.fetchone() is not None def get_mysql_type(col_name, sample_value, table_name=None): col = col_name.lower() tabelle_grandi = [ "singolo_stabile", "sistema", "condomin", "stabili", "stabili_stabili", "singolo_anno_sistema", "singolo_anno_condomin", "em_009_1_singolo_stabile" ] if table_name and table_name.lower() in tabelle_grandi: # Forza tutto a TEXT tranne i numerici e le chiavi primarie if col in ["id_stabile", "id_cond", "id_fornitore", "id_rate", "id_incasso"]: return "INT" try: float(sample_value) return "DOUBLE" except: return "TEXT" if col in ["p_iva", "cod_fisc", "codice", "num_ccp", "int", "interno", "id_fornitore", "cod", "n_mese"]: return "VARCHAR(20)" if col in ["frase", "note", "descrizione", "memo"]: return "TEXT" if col in ["periodo"]: return "VARCHAR(20)" if isinstance(sample_value, str) and len(sample_value) > 100: return "TEXT" try: float(sample_value) return "DOUBLE" except: return "VARCHAR(255)" # --- IMPORTAZIONE --- def create_or_update_table(cursor, table, df): """ Crea la tabella se non esiste, oppure aggiunge nuove colonne se necessario. """ columns = [] for col in df.columns: sample_val = df[col].dropna().iloc[0] if not df[col].dropna().empty else "" columns.append((col, get_mysql_type(col, sample_val, table))) # <-- passa anche il nome tabella! if not table_exists(cursor, table): # Crea tabella con PK se definita pk = get_table_pk(table) fields = ", ".join([f"`{c}` {t}" for c, t in columns]) pk_sql = f", PRIMARY KEY ({', '.join([f'`{k}`' for k in pk])})" if pk else "" sql = f"CREATE TABLE `{table.lower()}` ({fields}{pk_sql}) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4" # Nome tabella in minuscolo cursor.execute(sql) logging.info(f"Tabella '{table}' creata con colonne: {[c for c, _ in columns]}") else: # Aggiorna tabella se ci sono nuovi campi existing = get_existing_columns(cursor, table) for c, t in columns: if c not in existing: cursor.execute(f"ALTER TABLE `{table.lower()}` ADD COLUMN `{c}` {t}") # Nome tabella in minuscolo logging.info(f"Tabella '{table}' aggiornata: aggiunta colonna '{c}' ({t})") logging.info(f"Tabella '{table}' colonne esistenti: {existing}") logging.info(f"Tabella '{table}' colonne CSV: {[c for c, _ in columns]}") def import_csv_to_mysql(cfg, csv_path, table, conn, hash_dir): print(f"[DEBUG] Import {csv_path} -> {table}") try: df = pd.read_csv(csv_path, dtype=str, encoding=cfg.get("CSVEncoding", "utf-8")) df = df.fillna("") df.columns = [col.strip() for col in df.columns] campi_da_escludere = ["campo1", "campo2"] df = df[[c for c in df.columns if c not in campi_da_escludere]] cursor = conn.cursor() table_name_lower = table.lower() create_or_update_table(cursor, table_name_lower, df) # --- FIX ROW SIZE --- tabelle_grandi = ["singolo_stabile", "sistema", "condomin", "stabili"] if table_name_lower in tabelle_grandi: fix_table_row_size(cursor, table_name_lower) # --- FINE FIX --- pk = get_table_pk(table_name_lower) hash_file = os.path.join(hash_dir, f"{table}.md5") file_hash = hashlib.md5(open(csv_path, "rb").read()).hexdigest() if os.path.exists(hash_file): with open(hash_file, "r") as f: last_hash = f.read().strip() if last_hash == file_hash: logging.info(f"{table}: dati invariati, nessun update.") return cols = ", ".join([f"`{c}`" for c in df.columns]) vals = ", ".join(["%s"] * len(df.columns)) if pk: update_sql = ", ".join([f"`{c}`=VALUES(`{c}`)" for c in df.columns if c not in pk]) sql = f"INSERT INTO `{table_name_lower}` ({cols}) VALUES ({vals}) ON DUPLICATE KEY UPDATE {update_sql}" else: sql = f"REPLACE INTO `{table_name_lower}` ({cols}) VALUES ({vals})" for idx, row in enumerate(df.itertuples(index=False, name=None)): try: cursor.execute(sql, row) except Exception as e: logging.error(f"[IMPORT ERROR] Tabella: {table}, Riga: {idx+1}, Valori: {row}, Errore: {e}") conn.commit() os.makedirs(hash_dir, exist_ok=True) with open(hash_file, "w", encoding="utf-8") as f: f.write(file_hash) logging.info(f"{table}: dati importati/aggiornati ({len(df)} righe).") except Exception as e: logging.error(f"Errore importazione {table} da {csv_path}: {e}") finally: cursor.close() def scan_and_import(cfg): print(">>> scan_and_import chiamata", flush=True) print("=== INIZIO SCANSIONE ===") input_dir = cfg["InputDirectory"] print(f"[DEBUG] InputDirectory: {input_dir}") hash_dir = cfg.get("HashDirectory", "./hash") os.makedirs(hash_dir, exist_ok=True) conn = get_mysql_connection(cfg) # --- CONTROLLO E CREAZIONE COLONNA/TRIGGER STABILI --- cur = conn.cursor() cur.execute(""" SELECT COUNT(*) FROM INFORMATION_SCHEMA.COLUMNS WHERE table_name = 'stabili' AND column_name = 'cartella_condominio' """) if cur.fetchone()[0] == 0: cur.execute("ALTER TABLE stabili ADD COLUMN cartella_condominio VARCHAR(64)") cur.execute("UPDATE stabili SET cartella_condominio = nome_directory") cur.execute("DROP TRIGGER IF EXISTS stabili_clone_cartella") cur.execute(""" CREATE TRIGGER stabili_clone_cartella BEFORE INSERT ON stabili FOR EACH ROW SET NEW.cartella_condominio = NEW.nome_directory """) cur.execute("DROP TRIGGER IF EXISTS stabili_clone_cartella_update") cur.execute(""" CREATE TRIGGER stabili_clone_cartella_update BEFORE UPDATE ON stabili FOR EACH ROW SET NEW.cartella_condominio = NEW.nome_directory """) conn.commit() cur.close() # --- FINE CONTROLLO STABILI --- archivio_report = [] csv_trovati_totale = [] for root, dirs, files in os.walk(input_dir): print(f"[DEBUG] SCANSIONE CARTELLA: {root}", flush=True) print(f"[DEBUG] Sottocartelle: {dirs}", flush=True) print(f"[DEBUG] File trovati: {files}", flush=True) csv_trovati_in_questa_cartella = [] for file in files: print(f"[DEBUG] Analizzo file: {file}") if file.lower().endswith(".csv"): csv_path = os.path.join(root, file) parent_folder = os.path.basename(os.path.dirname(csv_path)) table_name_original = f"{parent_folder}_{os.path.splitext(file)[0]}" table_name_for_db = table_name_original.lower() print(f"[DEBUG] CSV trovato: {csv_path} -> Tabella: {table_name_for_db}") logging.info(f"Trovato CSV: {csv_path} -> Tabella DB: {table_name_for_db} (Originale: {table_name_original})") csv_trovati_in_questa_cartella.append(file) csv_trovati_totale.append(csv_path) try: df = pd.read_csv(csv_path, dtype=str, encoding=cfg.get("CSVEncoding", "utf-8")) df.columns = [c.strip() for c in df.columns] archivio_report.append({ "source": csv_path, "table_original_name": table_name_original, "table_db_name": table_name_for_db, "fields": list(df.columns), "rows": len(df) }) except Exception as e: logging.error(f"Errore lettura {csv_path} per report: {e}") import_csv_to_mysql(cfg, csv_path, table_name_original, conn, hash_dir) if csv_trovati_in_questa_cartella: print(f"[DEBUG] CSV trovati in '{root}': {len(csv_trovati_in_questa_cartella)}") else: print(f"[DEBUG] Nessun CSV trovato in '{root}'.") conn.close() output_dir_report = cfg.get("OutputDirectory", ".") os.makedirs(output_dir_report, exist_ok=True) report_path = os.path.join(output_dir_report, "report_archivio.json") try: with open(report_path, "w", encoding="utf-8") as f: json.dump(archivio_report, f, indent=2, ensure_ascii=False) logging.info(f"Report struttura archivi generato in {report_path}") except Exception as e: logging.error(f"Errore durante il salvataggio del report {report_path}: {e}") logging.info(f"Totale CSV analizzati per il report: {len(archivio_report)}") print(f"\nTotale CSV trovati in tutte le cartelle: {len(csv_trovati_totale)}") for f in csv_trovati_totale: print(f" - {f}") # --- CONTROLLO RELAZIONI --- def check_relations(cfg): """ Controllo preliminare delle relazioni tra le tabelle chiave. Stampa e logga eventuali anomalie (es. condomini senza stabile, rate senza condominio, incassi senza rate). """ try: conn = get_mysql_connection(cfg) cur = conn.cursor() # Controlla se la colonna esiste cur.execute(""" SELECT COUNT(*) FROM INFORMATION_SCHEMA.COLUMNS WHERE table_name = 'condomin' AND column_name = 'cartella_condominio' """) if cur.fetchone()[0] == 0: print("[INFO] La colonna 'cartella_condominio' non esiste in 'condomin', salto controllo relazioni condomini/stabili.") logging.info("Colonna 'cartella_condominio' assente in 'condomin', controllo relazioni saltato.") return # 1. Condomini senza stabile associato cur.execute(""" SELECT c.id_cond, c.cartella_condominio FROM condomin c LEFT JOIN stabili s ON c.cartella_condominio = s.nome_directory WHERE s.nome_directory IS NULL LIMIT 10 """) orfani = cur.fetchall() if orfani: print("[ATTENZIONE] Condomini senza stabile associato:", orfani) logging.warning(f"Condomini senza stabile associato: {orfani}") else: print("[OK] Tutti i condomini hanno uno stabile associato.") logging.info("Tutti i condomini hanno uno stabile associato.") # 2. Rate senza condominio associato cur.execute(""" SELECT r.id_rate, r.cartella_condominio FROM rate r LEFT JOIN condomin c ON r.cartella_condominio = c.cartella_condominio WHERE c.cartella_condominio IS NULL LIMIT 10 """) orfani = cur.fetchall() if orfani: print("[ATTENZIONE] Rate senza condominio associato:", orfani) logging.warning(f"Rate senza condominio associato: {orfani}") else: print("[OK] Tutte le rate hanno un condominio associato.") logging.info("Tutte le rate hanno un condominio associato.") # 3. Incassi senza rata associata cur.execute(""" SELECT i.ID_incasso, i.cartella_condominio, i.id_rate FROM incassi i LEFT JOIN rate r ON i.cartella_condominio = r.cartella_condominio AND i.id_rate = r.id_rate WHERE r.id_rate IS NULL LIMIT 10 """) orfani = cur.fetchall() if orfani: print("[ATTENZIONE] Incassi senza rata associata:", orfani) logging.warning(f"Incassi senza rata associata: {orfani}") else: print("[OK] Tutti gli incassi hanno una rata associata.") logging.info("Tutti gli incassi hanno una rata associata.") # 4. Fornitori senza stabile associato (se serve) cur.execute(""" SELECT f.id_fornitore, f.cartella_condominio FROM fornitori f LEFT JOIN stabili s ON f.cartella_condominio = s.nome_directory WHERE s.nome_directory IS NULL LIMIT 10 """) orfani = cur.fetchall() if orfani: print("[ATTENZIONE] Fornitori senza stabile associato:", orfani) logging.warning(f"Fornitori senza stabile associato: {orfani}") else: print("[OK] Tutti i fornitori hanno uno stabile associato.") logging.info("Tutti i fornitori hanno uno stabile associato.") cur.close() conn.close() except Exception as e: print("[ERRORE] Controllo relazioni:", e) logging.error(f"Errore controllo relazioni: {e}") # --- MAIN --- if __name__ == "__main__": BASE_DIR = os.path.dirname(os.path.abspath(__file__)) config_path = os.path.join(BASE_DIR, "agent_config.json") cfg = load_config(config_path) setup_logging(cfg["LogDirectory"], cfg.get("LogFile", "agent.log")) if cfg.get("Debug", False): logging.getLogger().setLevel(logging.DEBUG) logging.info("=== Avvio importazione batch NETGESCON ===") print("CONFIG:", cfg, flush=True) print("InputDirectory:", cfg["InputDirectory"], flush=True) scan_and_import(cfg) check_relations(cfg) # --- SINCRONIZZAZIONE STABILI --- # Carica il token dell'amministratore (puoi leggerlo da file o config) token = cfg["AdminToken"] # oppure leggi da file/token.json sincronizza_stabili(cfg, token) logging.info("=== Fine sincronizzazione stabili su nuovo gestionale ===") def sincronizza_stabili(cfg, token): endpoint = cfg["RestEndpointBase"].rstrip("/") + "/api/v1/import/condominio" hash_file = os.path.join(cfg.get("HashDirectory", "."), "hash_stabili.json") hash_storico = carica_hash_storico(hash_file) conn = mysql.connector.connect( host=cfg["MySQLHost"], user=cfg["MySQLUser"], password=cfg["MySQLPassword"], database=cfg["MySQLDatabase"] ) cur = conn.cursor(dictionary=True) cur.execute(""" SELECT id_stabile, cod_stabile, denominazione, indirizzo, cap, citta, pr, codice_fisc, cf_amministratore, note1 FROM stabili """) stabili = cur.fetchall() cur.close() conn.close() modificati = 0 for stabile in stabili: id_stabile = str(stabile["id_stabile"]) hash_attuale = calcola_hash(stabile) if hash_storico.get(id_stabile) != hash_attuale: headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", "Accept": "application/json" } try: response = requests.post(endpoint, headers=headers, json=stabile, timeout=10) print(f"[API] Stabile {id_stabile} -> Status: {response.status_code}, Risposta: {response.text}") hash_storico[id_stabile] = hash_attuale modificati += 1 except Exception as e: print(f"[API ERROR] Stabile {id_stabile} -> Errore: {e}") salva_hash_storico(hash_file, hash_storico) print(f"[SYNC] Stabili modificati/inviati: {modificati} su {len(stabili)}") def sincronizza_tabella(cfg, tabella, campo_id, endpoint_path, campi_payload): import mysql.connector conn = mysql.connector.connect( host=cfg["MySQLHost"], user=cfg["MySQLUser"], password=cfg["MySQLPassword"], database=cfg["MySQLDatabase"] ) assicurati_campi_sync(conn, tabella) cur = conn.cursor(dictionary=True) cur.execute(f"SELECT {campo_id}, {', '.join(campi_payload)}, hash_sync, sincronizzato FROM {tabella}") records = cur.fetchall() endpoint = cfg["RestEndpointBase"].rstrip("/") + endpoint_path token = cfg["AdminToken"] modificati = 0 for rec in records: record_id = rec[campo_id] payload = {k: rec[k] for k in campi_payload} hash_attuale = calcola_hash(payload) if rec["hash_sync"] != hash_attuale or not rec["sincronizzato"]: headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", "Accept": "application/json" } try: response = requests.post(endpoint, headers=headers, json=payload, timeout=10) print(f"[API] {tabella} {record_id} -> Status: {response.status_code}, Risposta: {response.text}") if response.status_code in (200, 201): cur2 = conn.cursor() cur2.execute( f"UPDATE {tabella} SET hash_sync=%s, sincronizzato=1 WHERE {campo_id}=%s", (hash_attuale, record_id) ) conn.commit() cur2.close() modificati += 1 except Exception as e: print(f"[API ERROR] {tabella} {record_id} -> Errore: {e}") cur.close() conn.close() print(f"[SYNC] {tabella}: modificati/inviati {modificati} su {len(records)}")