191 lines
6.6 KiB
Python
191 lines
6.6 KiB
Python
"""
|
|
Script di importazione CSV → MySQL per NETGESCON.
|
|
- Configurazione letta da agent_config.json (stile .env di Laravel)
|
|
- Aggiornamento automatico schema tabelle
|
|
- Update solo se i dati cambiano (hash MD5)
|
|
- Logging dettagliato
|
|
- Gestione PK (chiavi primarie) opzionale
|
|
- Codice commentato e scalabile
|
|
|
|
Dipendenze:
|
|
pip install pandas mysql-connector-python python-dotenv
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import pandas as pd
|
|
import mysql.connector
|
|
import hashlib
|
|
import logging
|
|
from datetime import datetime
|
|
|
|
# --- CONFIGURAZIONE ---
|
|
|
|
def load_config(config_path):
|
|
"""Carica la configurazione da agent_config.json"""
|
|
with open(config_path, encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
def get_mysql_connection(cfg):
|
|
"""Restituisce una connessione MySQL usando la config"""
|
|
return mysql.connector.connect(
|
|
host=cfg["MySQLHost"],
|
|
port=cfg["MySQLPort"],
|
|
user=cfg["MySQLUser"],
|
|
password=cfg["MySQLPassword"],
|
|
database=cfg["MySQLDatabase"]
|
|
)
|
|
|
|
# --- LOGGING ---
|
|
|
|
def setup_logging(log_dir, log_file):
|
|
"""Imposta il logging su file e console"""
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
log_path = os.path.join(log_dir, log_file)
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
handlers=[
|
|
logging.FileHandler(log_path, encoding="utf-8"),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
|
|
# --- UTILS ---
|
|
|
|
def infer_sql_type(val):
|
|
"""Inferisce il tipo SQL da un valore di esempio"""
|
|
try:
|
|
int(val)
|
|
return "INT"
|
|
except:
|
|
try:
|
|
float(val)
|
|
return "FLOAT"
|
|
except:
|
|
if isinstance(val, str) and len(val) > 255:
|
|
return "TEXT"
|
|
return "VARCHAR(255)"
|
|
|
|
def get_existing_columns(cursor, table):
|
|
"""Restituisce la lista delle colonne esistenti per una tabella"""
|
|
cursor.execute(f"SHOW COLUMNS FROM `{table}`")
|
|
return [row[0] for row in cursor.fetchall()]
|
|
|
|
def get_table_pk(table):
|
|
"""
|
|
Restituisce la chiave primaria per la tabella.
|
|
Puoi estendere questa funzione per mappare le PK delle tue tabelle.
|
|
"""
|
|
pk_map = {
|
|
"Stabili": ["id_stabile"],
|
|
"Fornitori": ["id_fornitore"],
|
|
"condomin": ["id_cond"],
|
|
"rate": ["id_rate"],
|
|
"incassi": ["ID_incasso"],
|
|
# ...aggiungi qui altre tabelle e le loro PK...
|
|
}
|
|
return pk_map.get(table, [])
|
|
|
|
def hash_row(row, fields):
|
|
"""Calcola l'hash MD5 di una riga (solo sui campi dati)"""
|
|
s = "|".join(str(row[f]) for f in fields)
|
|
return hashlib.md5(s.encode("utf-8")).hexdigest()
|
|
|
|
def table_exists(cursor, table):
|
|
cursor.execute(f"SHOW TABLES LIKE '{table}'")
|
|
return cursor.fetchone() is not None
|
|
|
|
# --- IMPORTAZIONE ---
|
|
|
|
def create_or_update_table(cursor, table, df):
|
|
"""
|
|
Crea la tabella se non esiste, oppure aggiunge nuove colonne se necessario.
|
|
"""
|
|
columns = []
|
|
for col in df.columns:
|
|
sample_val = df[col].dropna().iloc[0] if not df[col].dropna().empty else ""
|
|
columns.append((col, infer_sql_type(sample_val)))
|
|
if not table_exists(cursor, table):
|
|
# Crea tabella con PK se definita
|
|
pk = get_table_pk(table)
|
|
fields = ", ".join([f"`{c}` {t}" for c, t in columns])
|
|
pk_sql = f", PRIMARY KEY ({', '.join([f'`{k}`' for k in pk])})" if pk else ""
|
|
sql = f"CREATE TABLE `{table}` ({fields}{pk_sql}) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4"
|
|
cursor.execute(sql)
|
|
logging.info(f"Tabella '{table}' creata.")
|
|
else:
|
|
# Aggiorna tabella se ci sono nuovi campi
|
|
existing = get_existing_columns(cursor, table)
|
|
for c, t in columns:
|
|
if c not in existing:
|
|
cursor.execute(f"ALTER TABLE `{table}` ADD COLUMN `{c}` {t}")
|
|
logging.info(f"Tabella '{table}' aggiornata: aggiunta colonna '{c}'.")
|
|
|
|
def import_csv_to_mysql(cfg, csv_path, table, conn, hash_dir):
|
|
"""
|
|
Importa un CSV in una tabella MySQL, aggiornando solo se i dati cambiano.
|
|
"""
|
|
try:
|
|
df = pd.read_csv(csv_path, dtype=str, encoding=cfg.get("CSVEncoding", "utf-8"))
|
|
df = df.fillna("")
|
|
cursor = conn.cursor()
|
|
create_or_update_table(cursor, table, df)
|
|
pk = get_table_pk(table)
|
|
# Calcola hash del file per evitare update inutili
|
|
hash_file = os.path.join(hash_dir, f"{table}.md5")
|
|
file_hash = hashlib.md5(open(csv_path, "rb").read()).hexdigest()
|
|
if os.path.exists(hash_file):
|
|
with open(hash_file, "r") as f:
|
|
last_hash = f.read().strip()
|
|
if last_hash == file_hash:
|
|
logging.info(f"{table}: dati invariati, nessun update.")
|
|
cursor.close()
|
|
return
|
|
# Inserimento/aggiornamento dati
|
|
cols = ", ".join([f"`{c}`" for c in df.columns])
|
|
vals = ", ".join(["%s"] * len(df.columns))
|
|
if pk:
|
|
# Update se PK esiste, altrimenti insert
|
|
update_sql = ", ".join([f"`{c}`=VALUES(`{c}`)" for c in df.columns if c not in pk])
|
|
sql = f"INSERT INTO `{table}` ({cols}) VALUES ({vals}) ON DUPLICATE KEY UPDATE {update_sql}"
|
|
else:
|
|
sql = f"REPLACE INTO `{table}` ({cols}) VALUES ({vals})"
|
|
for row in df.itertuples(index=False, name=None):
|
|
cursor.execute(sql, row)
|
|
conn.commit()
|
|
# Salva hash
|
|
os.makedirs(hash_dir, exist_ok=True)
|
|
with open(hash_file, "w") as f:
|
|
f.write(file_hash)
|
|
logging.info(f"{table}: dati importati/aggiornati ({len(df)} righe).")
|
|
cursor.close()
|
|
except Exception as e:
|
|
logging.error(f"Errore importazione {table}: {e}")
|
|
|
|
def scan_and_import(cfg):
|
|
"""
|
|
Scansiona ricorsivamente la InputDirectory e importa tutti i CSV in MySQL.
|
|
"""
|
|
input_dir = cfg["InputDirectory"]
|
|
hash_dir = cfg.get("HashDirectory", "./hash")
|
|
conn = get_mysql_connection(cfg)
|
|
for root, dirs, files in os.walk(input_dir):
|
|
for file in files:
|
|
if file.lower().endswith(".csv"):
|
|
csv_path = os.path.join(root, file)
|
|
table = os.path.splitext(file)[0]
|
|
logging.info(f"Import {csv_path} -> {table}")
|
|
import_csv_to_mysql(cfg, csv_path, table, conn, hash_dir)
|
|
conn.close()
|
|
|
|
# --- MAIN ---
|
|
|
|
if __name__ == "__main__":
|
|
# Percorso relativo al file di config (stile Laravel .env)
|
|
config_path = os.path.join(os.path.dirname(__file__), "agent_config.json")
|
|
cfg = load_config(config_path)
|
|
setup_logging(cfg["LogDirectory"], cfg.get("LogFile", "agent.log"))
|
|
logging.info("=== Avvio importazione batch NETGESCON ===")
|
|
scan_and_import(cfg)
|
|
logging.info("=== Fine importazione batch NETGESCON ===") |