netgescon-master/scripts/miki.py

345 lines
14 KiB
Python

"""
Script di importazione CSV → MySQL per NETGESCON.
- Configurazione letta da agent_config.json (stile .env di Laravel)
- Aggiornamento automatico schema tabelle
- Update solo se i dati cambiano (hash MD5)
- Logging dettagliato
- Gestione PK (chiavi primarie) opzionale
- Controllo relazioni tra tabelle chiave
- Generazione report struttura archivi
- Codice commentato e scalabile
Dipendenze:
pip install pandas mysql-connector-python
"""
import os
import json
import pandas as pd
import mysql.connector
import hashlib
import logging
# --- CONFIGURAZIONE ---
def load_config(config_path):
"""Carica la configurazione da agent_config.json"""
with open(config_path, encoding="utf-8") as f:
return json.load(f)
def get_mysql_connection(cfg):
"""Restituisce una connessione MySQL usando la config"""
return mysql.connector.connect(
host=cfg["MySQLHost"],
port=cfg["MySQLPort"],
user=cfg["MySQLUser"],
password=cfg["MySQLPassword"],
database=cfg["MySQLDatabase"]
)
# --- LOGGING ---
def setup_logging(log_dir, log_file):
"""Imposta il logging su file e console"""
os.makedirs(log_dir, exist_ok=True)
log_path = os.path.join(log_dir, log_file)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(log_path, encoding="utf-8"),
logging.StreamHandler()
]
)
# --- UTILS ---
def infer_sql_type(val):
"""Inferisce il tipo SQL da un valore di esempio"""
try:
int(val)
return "INT"
except:
try:
float(val)
return "FLOAT"
except:
if isinstance(val, str) and len(val) > 255:
return "TEXT"
return "VARCHAR(255)"
def get_existing_columns(cursor, table):
"""Restituisce la lista delle colonne esistenti per una tabella"""
cursor.execute(f"SHOW COLUMNS FROM `{table}`")
return [row[0] for row in cursor.fetchall()]
def get_table_pk(table):
"""
Restituisce la chiave primaria per la tabella.
Puoi estendere questa funzione per mappare le PK delle tue tabelle.
"""
pk_map = {
"stabili": ["id_stabile"],
"fornitori": ["id_fornitore"],
"condomin": ["id_cond"],
"rate": ["id_rate"],
"incassi": ["ID_incasso"],
"emes_det": ["n_emissione", "anno_emissione", "n_ricevuta"], # esempio, personalizza secondo i tuoi dati
# ...aggiungi qui altre tabelle e le loro PK...
}
return pk_map.get(table.lower(), [])
def table_exists(cursor, table):
cursor.execute(f"SHOW TABLES LIKE '{table}'")
return cursor.fetchone() is not None
# --- IMPORTAZIONE ---
def create_or_update_table(cursor, table, df):
"""
Crea la tabella se non esiste, oppure aggiunge nuove colonne se necessario.
"""
columns = []
for col in df.columns:
sample_val = df[col].dropna().iloc[0] if not df[col].dropna().empty else ""
columns.append((col, infer_sql_type(sample_val)))
if not table_exists(cursor, table):
# Crea tabella con PK se definita
pk = get_table_pk(table)
fields = ", ".join([f"`{c}` {t}" for c, t in columns])
pk_sql = f", PRIMARY KEY ({', '.join([f'`{k}`' for k in pk])})" if pk else ""
sql = f"CREATE TABLE `{table.lower()}` ({fields}{pk_sql}) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4" # Nome tabella in minuscolo
cursor.execute(sql)
logging.info(f"Tabella '{table}' creata con colonne: {[c for c, _ in columns]}")
else:
# Aggiorna tabella se ci sono nuovi campi
existing = get_existing_columns(cursor, table)
for c, t in columns:
if c not in existing:
cursor.execute(f"ALTER TABLE `{table.lower()}` ADD COLUMN `{c}` {t}") # Nome tabella in minuscolo
logging.info(f"Tabella '{table}' aggiornata: aggiunta colonna '{c}' ({t})")
logging.info(f"Tabella '{table}' colonne esistenti: {existing}")
logging.info(f"Tabella '{table}' colonne CSV: {[c for c, _ in columns]}")
def import_csv_to_mysql(cfg, csv_path, table, conn, hash_dir):
print(f"[DEBUG] Import {csv_path} -> {table}")
try:
df = pd.read_csv(csv_path, dtype=str, encoding=cfg.get("CSVEncoding", "utf-8"))
df = df.fillna("")
df.columns = [col.strip() for col in df.columns] # Pulisce i nomi delle colonne
cursor = conn.cursor()
table_name_lower = table.lower() # Usa nome tabella normalizzato
create_or_update_table(cursor, table_name_lower, df)
pk = get_table_pk(table_name_lower) # Usa nome tabella normalizzato per PK
hash_file = os.path.join(hash_dir, f"{table}.md5")
file_hash = hashlib.md5(open(csv_path, "rb").read()).hexdigest()
if os.path.exists(hash_file):
with open(hash_file, "r") as f:
last_hash = f.read().strip()
if last_hash == file_hash:
logging.info(f"{table}: dati invariati, nessun update.")
# cursor.close() # Non chiudere il cursore qui, verrà chiuso alla fine o in caso di eccezione
return
# Inserimento/aggiornamento dati
cols = ", ".join([f"`{c}`" for c in df.columns])
vals = ", ".join(["%s"] * len(df.columns))
if pk:
# Update se PK esiste, altrimenti insert
update_sql = ", ".join([f"`{c}`=VALUES(`{c}`)" for c in df.columns if c not in pk])
sql = f"INSERT INTO `{table_name_lower}` ({cols}) VALUES ({vals}) ON DUPLICATE KEY UPDATE {update_sql}"
else:
sql = f"REPLACE INTO `{table_name_lower}` ({cols}) VALUES ({vals})"
for row in df.itertuples(index=False, name=None):
cursor.execute(sql, row)
conn.commit()
# Salva hash
os.makedirs(hash_dir, exist_ok=True)
with open(hash_file, "w", encoding="utf-8") as f:
f.write(file_hash)
logging.info(f"{table}: dati importati/aggiornati ({len(df)} righe).")
except Exception as e:
logging.error(f"Errore importazione {table} da {csv_path}: {e}")
finally:
cursor.close()
def scan_and_import(cfg):
print(">>> scan_and_import chiamata", flush=True)
print("=== INIZIO SCANSIONE ===")
input_dir = cfg["InputDirectory"]
print(f"[DEBUG] InputDirectory: {input_dir}")
hash_dir = cfg.get("HashDirectory", "./hash")
os.makedirs(hash_dir, exist_ok=True)
conn = get_mysql_connection(cfg)
archivio_report = []
csv_trovati_totale = []
for root, dirs, files in os.walk(input_dir):
print(f"[DEBUG] SCANSIONE CARTELLA: {root}", flush=True)
print(f"[DEBUG] Sottocartelle: {dirs}", flush=True)
print(f"[DEBUG] File trovati: {files}", flush=True)
csv_trovati_in_questa_cartella = []
for file in files:
print(f"[DEBUG] Analizzo file: {file}")
if file.lower().endswith(".csv"):
csv_path = os.path.join(root, file)
parent_folder = os.path.basename(os.path.dirname(csv_path))
table_name_original = f"{parent_folder}_{os.path.splitext(file)[0]}"
table_name_for_db = table_name_original.lower()
print(f"[DEBUG] CSV trovato: {csv_path} -> Tabella: {table_name_for_db}")
logging.info(f"Trovato CSV: {csv_path} -> Tabella DB: {table_name_for_db} (Originale: {table_name_original})")
csv_trovati_in_questa_cartella.append(file)
csv_trovati_totale.append(csv_path)
try:
df = pd.read_csv(csv_path, dtype=str, encoding=cfg.get("CSVEncoding", "utf-8"))
df.columns = [c.strip() for c in df.columns]
archivio_report.append({
"source": csv_path,
"table_original_name": table_name_original,
"table_db_name": table_name_for_db,
"fields": list(df.columns),
"rows": len(df)
})
except Exception as e:
logging.error(f"Errore lettura {csv_path} per report: {e}")
import_csv_to_mysql(cfg, csv_path, table_name_original, conn, hash_dir)
if csv_trovati_in_questa_cartella:
print(f"[DEBUG] CSV trovati in '{root}': {len(csv_trovati_in_questa_cartella)}")
else:
print(f"[DEBUG] Nessun CSV trovato in '{root}'.")
conn.close()
output_dir_report = cfg.get("OutputDirectory", ".")
os.makedirs(output_dir_report, exist_ok=True)
report_path = os.path.join(output_dir_report, "report_archivio.json")
try:
with open(report_path, "w", encoding="utf-8") as f:
json.dump(archivio_report, f, indent=2, ensure_ascii=False)
logging.info(f"Report struttura archivi generato in {report_path}")
except Exception as e:
logging.error(f"Errore durante il salvataggio del report {report_path}: {e}")
logging.info(f"Totale CSV analizzati per il report: {len(archivio_report)}")
print(f"\nTotale CSV trovati in tutte le cartelle: {len(csv_trovati_totale)}")
for f in csv_trovati_totale:
print(f" - {f}")
# --- CONTROLLO RELAZIONI ---
def check_relations(cfg):
"""
Controllo preliminare delle relazioni tra le tabelle chiave.
Stampa e logga eventuali anomalie (es. condomini senza stabile, rate senza condominio, incassi senza rate).
"""
try:
conn = get_mysql_connection(cfg)
cur = conn.cursor()
# 1. Condomini senza stabile associato
cur.execute("""
SELECT c.id_cond, c.cartella_condominio
FROM condomin c
LEFT JOIN stabili s ON c.cartella_condominio = s.nome_directory
WHERE s.nome_directory IS NULL
LIMIT 10
""")
orfani = cur.fetchall()
if orfani:
print("[ATTENZIONE] Condomini senza stabile associato:", orfani)
logging.warning(f"Condomini senza stabile associato: {orfani}")
else:
print("[OK] Tutti i condomini hanno uno stabile associato.")
logging.info("Tutti i condomini hanno uno stabile associato.")
# 2. Rate senza condominio associato
cur.execute("""
SELECT r.id_rate, r.cartella_condominio
FROM rate r
LEFT JOIN condomin c ON r.cartella_condominio = c.cartella_condominio
WHERE c.cartella_condominio IS NULL
LIMIT 10
""")
orfani = cur.fetchall()
if orfani:
print("[ATTENZIONE] Rate senza condominio associato:", orfani)
logging.warning(f"Rate senza condominio associato: {orfani}")
else:
print("[OK] Tutte le rate hanno un condominio associato.")
logging.info("Tutte le rate hanno un condominio associato.")
# 3. Incassi senza rata associata
cur.execute("""
SELECT i.ID_incasso, i.cartella_condominio, i.id_rate
FROM incassi i
LEFT JOIN rate r ON i.cartella_condominio = r.cartella_condominio AND i.id_rate = r.id_rate
WHERE r.id_rate IS NULL
LIMIT 10
""")
orfani = cur.fetchall()
if orfani:
print("[ATTENZIONE] Incassi senza rata associata:", orfani)
logging.warning(f"Incassi senza rata associata: {orfani}")
else:
print("[OK] Tutti gli incassi hanno una rata associata.")
logging.info("Tutti gli incassi hanno una rata associata.")
# 4. Fornitori senza stabile associato (se serve)
cur.execute("""
SELECT f.id_fornitore, f.cartella_condominio
FROM fornitori f
LEFT JOIN stabili s ON f.cartella_condominio = s.nome_directory
WHERE s.nome_directory IS NULL
LIMIT 10
""")
orfani = cur.fetchall()
if orfani:
print("[ATTENZIONE] Fornitori senza stabile associato:", orfani)
logging.warning(f"Fornitori senza stabile associato: {orfani}")
else:
print("[OK] Tutti i fornitori hanno uno stabile associato.")
logging.info("Tutti i fornitori hanno uno stabile associato.")
cur.close()
conn.close()
except Exception as e:
print("[ERRORE] Controllo relazioni:", e)
logging.error(f"Errore controllo relazioni: {e}")
# --- MAIN ---
if __name__ == "__main__":
print(">>> MAIN PARTITO", flush=True)
config_path = os.path.join(os.path.dirname(__file__), "agent_config.json")
cfg = load_config(config_path)
setup_logging(cfg["LogDirectory"], cfg.get("LogFile", "agent.log"))
logging.info("=== Avvio importazione batch NETGESCON ===")
print("CONFIG:", cfg, flush=True)
print("InputDirectory:", cfg["InputDirectory"], flush=True)
scan_and_import(cfg)
check_relations(cfg)
logging.info("=== Fine importazione batch NETGESCON ===")
# Elaborazione CSV "stabili" separatamente
try:
conn = get_mysql_connection(cfg)
cur = conn.cursor()
# Mappatura cartelle → ID stabili (da usare per gli altri CSV)
stabili_mapping = {}
cur.execute("SELECT id_stabile, nome_directory FROM stabili")
for id_stabile, nome_directory in cur.fetchall():
stabili_mapping[nome_directory] = id_stabile
# Elaborazione CSV in INPUT_ROOT (esclusi quelli già trattati)
INPUT_ROOT = cfg["InputDirectory"]
for root, _, files in os.walk(INPUT_ROOT):
for fname in files:
if fname.lower().endswith(".csv") and not fname.lower().startswith("stabili"):
csv_path = os.path.join(root, fname)
process_csv(csv_path, cur, stabili_mapping)
conn.commit()
except Exception as e:
logging.error(f"Errore durante l'elaborazione dei CSV: {e}")
finally:
cur.close()
conn.close()