import os import json import pandas as pd import mysql.connector import hashlib import logging import requests from logging.handlers import RotatingFileHandler # --- CONFIGURAZIONE E LOGGING --- def load_config(config_path): with open(config_path, encoding="utf-8") as f: return json.load(f) def setup_logging(log_dir, log_file, debug=False): os.makedirs(log_dir, exist_ok=True) log_path = os.path.join(log_dir, log_file) logging.basicConfig( level=logging.DEBUG if debug else logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ RotatingFileHandler(log_path, maxBytes=5*1024*1024, backupCount=3, encoding="utf-8"), logging.StreamHandler() ] ) def get_mysql_connection(cfg): return mysql.connector.connect( host=cfg["MySQLHost"], port=cfg["MySQLPort"], user=cfg["MySQLUser"], password=cfg["MySQLPassword"], database=cfg["MySQLDatabase"] ) # --- UTILS DATABASE E HASH --- def add_sync_columns_if_not_exist(cursor, table): cursor.execute(f"SHOW COLUMNS FROM `{table}` LIKE 'hash_sync'") if not cursor.fetchone(): cursor.execute(f"ALTER TABLE `{table}` ADD COLUMN `hash_sync` VARCHAR(32) NULL") logging.info(f"Aggiunta colonna 'hash_sync' a tabella '{table}'") cursor.execute(f"SHOW COLUMNS FROM `{table}` LIKE 'sincronizzato'") if not cursor.fetchone(): cursor.execute(f"ALTER TABLE `{table}` ADD COLUMN `sincronizzato` BOOLEAN NOT NULL DEFAULT FALSE") logging.info(f"Aggiunta colonna 'sincronizzato' a tabella '{table}'") def calculate_hash(data_dict): # Ordina il dizionario per chiave per avere un hash consistente e gestisce i valori None filtered_dict = {k: v for k, v in data_dict.items() if v is not None} ordered_data = json.dumps(filtered_dict, sort_keys=True, default=str).encode('utf-8') return hashlib.md5(ordered_data).hexdigest() # --- IMPORTAZIONE CSV -> MYSQL --- def import_csv_to_mysql(cfg, csv_path, table, conn): try: df = pd.read_csv(csv_path, dtype=str, encoding=cfg.get("CSVEncoding", "utf-8")).fillna("") df.columns = [col.strip() for col in df.columns] cursor = conn.cursor() # Creazione/Aggiornamento tabella pk_map = {m['table']: m['pk'] for m in cfg.get("SyncMappings", [])} pk = pk_map.get(table) cols_sql_parts = [] for c in df.columns: col_type = "TEXT" if df[c].str.len().max() > 255 else "VARCHAR(255)" cols_sql_parts.append(f"`{c}` {col_type}") pk_sql = f", PRIMARY KEY (`{pk}`)" if pk else "" cursor.execute(f"CREATE TABLE IF NOT EXISTS `{table}` ({', '.join(cols_sql_parts)}{pk_sql}) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4") add_sync_columns_if_not_exist(cursor, table) # Logica di inserimento/aggiornamento cols = ", ".join([f"`{c}`" for c in df.columns]) vals = ", ".join(["%s"] * len(df.columns)) if pk: update_sql = ", ".join([f"`{c}`=VALUES(`{c}`)" for c in df.columns if c != pk]) sql = f"INSERT INTO `{table}` ({cols}) VALUES ({vals}) ON DUPLICATE KEY UPDATE {update_sql}" else: sql = f"REPLACE INTO `{table}` ({cols}) VALUES ({vals})" cursor.executemany(sql, df.to_records(index=False).tolist()) conn.commit() logging.info(f"Importate/Aggiornate {len(df)} righe da '{os.path.basename(csv_path)}' nella tabella '{table}'.") cursor.close() except Exception as e: logging.error(f"Errore durante importazione di {csv_path} in {table}: {e}", exc_info=True) def scan_and_import(cfg, conn): logging.info("--- Inizio scansione e importazione CSV in MySQL ---") input_dir = cfg["InputDirectory"] if not os.path.isdir(input_dir): logging.error(f"La cartella di input '{input_dir}' non esiste.") return # La logica os.walk gestisce automaticamente le sottocartelle per anni/condomini for root, _, files in os.walk(input_dir): for file in files: if file.lower().endswith(".csv"): csv_path = os.path.join(root, file) # Il nome della tabella รจ il nome del file senza estensione table_name = os.path.splitext(file)[0].lower() import_csv_to_mysql(cfg, csv_path, table_name, conn) logging.info("--- Fine scansione e importazione CSV ---") # --- SINCRONIZZAZIONE MYSQL -> API --- def send_to_api(endpoint, token, payload): headers = { 'Authorization': f'Bearer {token}', 'Accept': 'application/json', 'Content-Type': 'application/json' } try: response = requests.post(endpoint, headers=headers, json=payload, timeout=15) response.raise_for_status() logging.info(f"Dati inviati a {endpoint}. Risposta: {response.status_code}") return True except requests.exceptions.RequestException as e: logging.error(f"Errore API per {endpoint}: {e}") if e.response is not None: logging.error(f"Dettagli errore API: {e.response.text}") return False def synchronize_table(cfg, mapping, conn): table = mapping['table'] pk_col = mapping['pk'] endpoint = cfg['ApiBaseUrl'] + mapping['endpoint'] token = cfg['AdminToken'] payload_fields = mapping['payload_fields'] logging.info(f"--- Inizio sincronizzazione tabella: {table} ---") cursor = conn.cursor(dictionary=True) try: db_fields = list(payload_fields.keys()) select_fields = list(set(db_fields + [pk_col, 'hash_sync', 'sincronizzato'])) cursor.execute(f"SELECT {', '.join(f'`{f}`' for f in select_fields)} FROM `{table}`") records = cursor.fetchall() updated_count = 0 for record in records: payload = {api_key: record.get(db_key) for db_key, api_key in payload_fields.items()} current_hash = calculate_hash(payload) if current_hash != record.get('hash_sync') or not record.get('sincronizzato'): logging.info(f"Sincronizzazione record {table} con PK {record[pk_col]} (hash cambiato o mai sincronizzato).") if send_to_api(endpoint, token, payload): update_cursor = conn.cursor() sql_update = f"UPDATE `{table}` SET hash_sync = %s, sincronizzato = TRUE WHERE `{pk_col}` = %s" update_cursor.execute(sql_update, (current_hash, record[pk_col])) conn.commit() update_cursor.close() updated_count += 1 logging.info(f"--- Fine sincronizzazione {table}. Record inviati/aggiornati: {updated_count}/{len(records)} ---") except mysql.connector.Error as err: if err.errno == 1146: # Table doesn't exist logging.warning(f"Tabella '{table}' non trovata nel database. Sincronizzazione saltata.") else: logging.error(f"Errore MySQL durante la sincronizzazione di '{table}': {err}") finally: cursor.close() def synchronize_all(cfg, conn): logging.info("=== Inizio processo di sincronizzazione con API ===") token = cfg.get("AdminToken") if not token or "INSERIRE" in token: logging.error("Token API non configurato in agent_config.json. Sincronizzazione saltata.") return for mapping in cfg.get("SyncMappings", []): synchronize_table(cfg, mapping, conn) logging.info("=== Fine processo di sincronizzazione con API ===") # --- LOGICA PRINCIPALE --- if __name__ == "__main__": base_dir = os.path.dirname(os.path.abspath(__file__)) config_path = os.path.join(base_dir, "agent_config.json") try: cfg = load_config(config_path) except FileNotFoundError: logging.basicConfig(level=logging.ERROR) logging.error(f"File di configurazione non trovato in '{config_path}'") exit(1) setup_logging(cfg["LogDirectory"], cfg.get("LogFile", "agent.log"), cfg.get("Debug", False)) conn = None try: conn = get_mysql_connection(cfg) logging.info("Connessione a MySQL stabilita.") # 1. Legge tutti i CSV (anche da sottocartelle) e li carica in MySQL scan_and_import(cfg, conn) # 2. Legge la configurazione e sincronizza le tabelle mappate synchronize_all(cfg, conn) except mysql.connector.Error as err: logging.error(f"Errore di connessione a MySQL: {err}") except Exception as e: logging.error(f"Errore imprevisto nel processo principale: {e}", exc_info=True) finally: if conn and conn.is_connected(): conn.close() logging.info("Connessione a MySQL chiusa.")