netgescon-master/scripts/import_netgescon_csv_mysql_agent.py

219 lines
8.9 KiB
Python

import os
import json
import pandas as pd
import mysql.connector
import hashlib
import logging
import requests
from logging.handlers import RotatingFileHandler
# --- CONFIGURAZIONE E LOGGING ---
def load_config(config_path):
with open(config_path, encoding="utf-8") as f:
return json.load(f)
def setup_logging(log_dir, log_file, debug=False):
os.makedirs(log_dir, exist_ok=True)
log_path = os.path.join(log_dir, log_file)
logging.basicConfig(
level=logging.DEBUG if debug else logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
RotatingFileHandler(log_path, maxBytes=5*1024*1024, backupCount=3, encoding="utf-8"),
logging.StreamHandler()
]
)
def get_mysql_connection(cfg):
return mysql.connector.connect(
host=cfg["MySQLHost"],
port=cfg["MySQLPort"],
user=cfg["MySQLUser"],
password=cfg["MySQLPassword"],
database=cfg["MySQLDatabase"]
)
# --- UTILS DATABASE E HASH ---
def add_sync_columns_if_not_exist(cursor, table):
cursor.execute(f"SHOW COLUMNS FROM `{table}` LIKE 'hash_sync'")
if not cursor.fetchone():
cursor.execute(f"ALTER TABLE `{table}` ADD COLUMN `hash_sync` VARCHAR(32) NULL")
logging.info(f"Aggiunta colonna 'hash_sync' a tabella '{table}'")
cursor.execute(f"SHOW COLUMNS FROM `{table}` LIKE 'sincronizzato'")
if not cursor.fetchone():
cursor.execute(f"ALTER TABLE `{table}` ADD COLUMN `sincronizzato` BOOLEAN NOT NULL DEFAULT FALSE")
logging.info(f"Aggiunta colonna 'sincronizzato' a tabella '{table}'")
def calculate_hash(data_dict):
# Ordina il dizionario per chiave per avere un hash consistente e gestisce i valori None
filtered_dict = {k: v for k, v in data_dict.items() if v is not None}
ordered_data = json.dumps(filtered_dict, sort_keys=True, default=str).encode('utf-8')
return hashlib.md5(ordered_data).hexdigest()
# --- IMPORTAZIONE CSV -> MYSQL ---
def import_csv_to_mysql(cfg, csv_path, table, conn):
try:
df = pd.read_csv(csv_path, dtype=str, encoding=cfg.get("CSVEncoding", "utf-8")).fillna("")
df.columns = [col.strip() for col in df.columns]
cursor = conn.cursor()
# Creazione/Aggiornamento tabella
pk_map = {m['table']: m['pk'] for m in cfg.get("SyncMappings", [])}
pk = pk_map.get(table)
cols_sql_parts = []
for c in df.columns:
col_type = "TEXT" if df[c].str.len().max() > 255 else "VARCHAR(255)"
cols_sql_parts.append(f"`{c}` {col_type}")
pk_sql = f", PRIMARY KEY (`{pk}`)" if pk else ""
cursor.execute(f"CREATE TABLE IF NOT EXISTS `{table}` ({', '.join(cols_sql_parts)}{pk_sql}) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4")
add_sync_columns_if_not_exist(cursor, table)
# Logica di inserimento/aggiornamento
cols = ", ".join([f"`{c}`" for c in df.columns])
vals = ", ".join(["%s"] * len(df.columns))
if pk:
update_sql = ", ".join([f"`{c}`=VALUES(`{c}`)" for c in df.columns if c != pk])
sql = f"INSERT INTO `{table}` ({cols}) VALUES ({vals}) ON DUPLICATE KEY UPDATE {update_sql}"
else:
sql = f"REPLACE INTO `{table}` ({cols}) VALUES ({vals})"
cursor.executemany(sql, df.to_records(index=False).tolist())
conn.commit()
logging.info(f"Importate/Aggiornate {len(df)} righe da '{os.path.basename(csv_path)}' nella tabella '{table}'.")
cursor.close()
except Exception as e:
logging.error(f"Errore durante importazione di {csv_path} in {table}: {e}", exc_info=True)
def scan_and_import(cfg, conn):
logging.info("--- Inizio scansione e importazione CSV in MySQL ---")
input_dir = cfg["InputDirectory"]
if not os.path.isdir(input_dir):
logging.error(f"La cartella di input '{input_dir}' non esiste.")
return
# La logica os.walk gestisce automaticamente le sottocartelle per anni/condomini
for root, _, files in os.walk(input_dir):
for file in files:
if file.lower().endswith(".csv"):
csv_path = os.path.join(root, file)
# Il nome della tabella è il nome del file senza estensione
table_name = os.path.splitext(file)[0].lower()
import_csv_to_mysql(cfg, csv_path, table_name, conn)
logging.info("--- Fine scansione e importazione CSV ---")
# --- SINCRONIZZAZIONE MYSQL -> API ---
def send_to_api(endpoint, token, payload):
headers = {
'Authorization': f'Bearer {token}',
'Accept': 'application/json',
'Content-Type': 'application/json'
}
try:
response = requests.post(endpoint, headers=headers, json=payload, timeout=15)
response.raise_for_status()
logging.info(f"Dati inviati a {endpoint}. Risposta: {response.status_code}")
return True
except requests.exceptions.RequestException as e:
logging.error(f"Errore API per {endpoint}: {e}")
if e.response is not None:
logging.error(f"Dettagli errore API: {e.response.text}")
return False
def synchronize_table(cfg, mapping, conn):
table = mapping['table']
pk_col = mapping['pk']
endpoint = cfg['ApiBaseUrl'] + mapping['endpoint']
token = cfg['AdminToken']
payload_fields = mapping['payload_fields']
logging.info(f"--- Inizio sincronizzazione tabella: {table} ---")
cursor = conn.cursor(dictionary=True)
try:
db_fields = list(payload_fields.keys())
select_fields = list(set(db_fields + [pk_col, 'hash_sync', 'sincronizzato']))
cursor.execute(f"SELECT {', '.join(f'`{f}`' for f in select_fields)} FROM `{table}`")
records = cursor.fetchall()
updated_count = 0
for record in records:
payload = {api_key: record.get(db_key) for db_key, api_key in payload_fields.items()}
current_hash = calculate_hash(payload)
if current_hash != record.get('hash_sync') or not record.get('sincronizzato'):
logging.info(f"Sincronizzazione record {table} con PK {record[pk_col]} (hash cambiato o mai sincronizzato).")
if send_to_api(endpoint, token, payload):
update_cursor = conn.cursor()
sql_update = f"UPDATE `{table}` SET hash_sync = %s, sincronizzato = TRUE WHERE `{pk_col}` = %s"
update_cursor.execute(sql_update, (current_hash, record[pk_col]))
conn.commit()
update_cursor.close()
updated_count += 1
logging.info(f"--- Fine sincronizzazione {table}. Record inviati/aggiornati: {updated_count}/{len(records)} ---")
except mysql.connector.Error as err:
if err.errno == 1146: # Table doesn't exist
logging.warning(f"Tabella '{table}' non trovata nel database. Sincronizzazione saltata.")
else:
logging.error(f"Errore MySQL durante la sincronizzazione di '{table}': {err}")
finally:
cursor.close()
def synchronize_all(cfg, conn):
logging.info("=== Inizio processo di sincronizzazione con API ===")
token = cfg.get("AdminToken")
if not token or "INSERIRE" in token:
logging.error("Token API non configurato in agent_config.json. Sincronizzazione saltata.")
return
for mapping in cfg.get("SyncMappings", []):
synchronize_table(cfg, mapping, conn)
logging.info("=== Fine processo di sincronizzazione con API ===")
# --- LOGICA PRINCIPALE ---
if __name__ == "__main__":
base_dir = os.path.dirname(os.path.abspath(__file__))
config_path = os.path.join(base_dir, "agent_config.json")
try:
cfg = load_config(config_path)
except FileNotFoundError:
logging.basicConfig(level=logging.ERROR)
logging.error(f"File di configurazione non trovato in '{config_path}'")
exit(1)
setup_logging(cfg["LogDirectory"], cfg.get("LogFile", "agent.log"), cfg.get("Debug", False))
conn = None
try:
conn = get_mysql_connection(cfg)
logging.info("Connessione a MySQL stabilita.")
# 1. Legge tutti i CSV (anche da sottocartelle) e li carica in MySQL
scan_and_import(cfg, conn)
# 2. Legge la configurazione e sincronizza le tabelle mappate
synchronize_all(cfg, conn)
except mysql.connector.Error as err:
logging.error(f"Errore di connessione a MySQL: {err}")
except Exception as e:
logging.error(f"Errore imprevisto nel processo principale: {e}", exc_info=True)
finally:
if conn and conn.is_connected():
conn.close()
logging.info("Connessione a MySQL chiusa.")