netgescon-master/scripts/import_mdb_to_mysql_advanced.py

200 lines
6.8 KiB
Python

#!/usr/bin/env python3
"""
import_mdb_to_mysql_advanced.py
Script avanzato per importazione dati da file MDB a MySQL.
- Ricerca ricorsiva file MDB (come lo script bash).
- Nessun file temporaneo: tutto in memoria.
- Per ogni tabella:
- Estrae i dati direttamente via mdb-export in streaming.
- Calcola hash MD5 di ogni riga (come facevi con i CSV).
- Deduplica: importa solo nuove righe in MySQL, evita doppioni.
- Aggiorna struttura tabella MySQL se cambia modello.
- Logging avanzato (file JSON).
- Pronto per essere usato/esteso in futuro, anche per route Laravel/backend API.
Autore: Pikappa2 & Copilot
Ultimo agg.: 2025-06-02
"""
import os
import pymysql
import subprocess
import csv
import json
import hashlib
import datetime
import re
# --- CONFIGURAZIONE ---
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
CONFIG_PATH = os.path.join(SCRIPT_DIR, "../agent_config.json")
LOGDIR = os.path.join(SCRIPT_DIR, "../log")
os.makedirs(LOGDIR, exist_ok=True)
LOGFILE = os.path.join(LOGDIR, "import_mdb_to_mysql.jsonlog")
# Config da JSON esterno
with open(CONFIG_PATH) as f:
config = json.load(f)
MYSQL_HOST = config.get("MySQLHost", "localhost")
MYSQL_DB = config.get("MySQLDatabase", "netgescon")
MYSQL_USER = config.get("MySQLUser", "root")
MYSQL_PW = config.get("MySQLPassword", "password")
INPUT_ROOT = config.get("InputDirectory", SCRIPT_DIR)
# --- LOGGING ---
def log_event(event, **kwargs):
row = {"event": event, "timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
row.update(kwargs)
with open(LOGFILE, "a") as f:
f.write(json.dumps(row, ensure_ascii=False) + "\n")
# --- MYSQL UTILS ---
def connect_mysql():
return pymysql.connect(
host=MYSQL_HOST,
user=MYSQL_USER,
password=MYSQL_PW,
database=MYSQL_DB,
charset="utf8mb4",
autocommit=True
)
def infer_type(val):
if val is None or str(val).strip() == "":
return "VARCHAR(255)"
s = str(val)
if re.fullmatch(r"-?\d+", s):
return "INT"
try:
float(s.replace(",", "."))
return "FLOAT"
except Exception:
pass
if len(s) > 255:
return "TEXT"
return "VARCHAR(255)"
def get_existing_columns(cur, table_name):
try:
cur.execute(f"SHOW COLUMNS FROM `{table_name}`;")
return set([row[0] for row in cur.fetchall()])
except Exception:
return set()
def alter_table_add_columns(cur, table_name, header, sample_row):
existing_cols = get_existing_columns(cur, table_name)
for col in header:
if col not in existing_cols:
t = infer_type(sample_row.get(col, ""))
sql = f"ALTER TABLE `{table_name}` ADD COLUMN `{col}` {t} NULL"
cur.execute(sql)
def create_table(cur, table_name, header, sample_row):
fields_types = []
for col in header:
sample = sample_row.get(col, "")
t = infer_type(sample)
fields_types.append(f"`{col}` {t}")
sql = f"CREATE TABLE IF NOT EXISTS `{table_name}` (" \
f"id INT AUTO_INCREMENT PRIMARY KEY, " \
f"{', '.join(fields_types)}, " \
f"_hash_row CHAR(32) UNIQUE, " \
f"_imported_at DATETIME)"
cur.execute(sql)
def ensure_table_structure(cur, table_name, header, sample_row):
cur.execute("SHOW TABLES LIKE %s", (table_name,))
if cur.fetchone():
alter_table_add_columns(cur, table_name, header, sample_row)
else:
create_table(cur, table_name, header, sample_row)
def calc_row_hash_md5(header, row):
values = [str(row.get(col, "")).strip() for col in header]
return hashlib.md5("|".join(values).encode("utf-8")).hexdigest()
def insert_row(cur, table_name, header, row):
hash_row = calc_row_hash_md5(header, row)
cols = [f"`{col}`" for col in header]
vals = [row.get(col, None) for col in header]
cols += ["_hash_row", "_imported_at"]
vals += [hash_row, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")]
placeholders = ", ".join(["%s"] * len(cols))
sql = f"INSERT IGNORE INTO `{table_name}` ({', '.join(cols)}) VALUES ({placeholders})"
cur.execute(sql, vals)
def safe_name(s):
return re.sub(r"[^a-zA-Z0-9_]", "_", str(s)).lower()
def parse_info_from_path(path):
# Personalizza se vuoi info da path
return "unknownadmin", "unknowncond"
def stream_csv_from_mdb(mdb_path, tab):
proc = subprocess.Popen(["mdb-export", "-H", "csv", mdb_path, tab], stdout=subprocess.PIPE)
for line in proc.stdout:
yield line.decode(errors="ignore")
# --- ESTRAZIONE E IMPORT IN MEMORIA ---
def process_mdb(mdb_path, admin_code, cond_code, cur):
base = os.path.splitext(os.path.basename(mdb_path))[0]
log_event("start_mdb", file=mdb_path)
try:
tables = subprocess.check_output(["mdb-tables", "-1", mdb_path]).decode().split()
except Exception as e:
log_event("error_mdb", file=mdb_path, error=str(e))
print(f"Errore lettura tabelle: {e}")
return
for tab in tables:
safe_tab = safe_name(tab)
table_name = f"mdb_{safe_name(admin_code)}_{safe_name(cond_code)}_{safe_name(base)}_{safe_tab}"
log_event("import_table", mdb=mdb_path, admin=admin_code, condominio=cond_code, table_mdb=tab, table_mysql=table_name)
try:
csv_stream = stream_csv_from_mdb(mdb_path, tab)
reader = csv.DictReader(csv_stream, delimiter=",")
first_row = None
rows = []
for row in reader:
if not first_row:
first_row = row
rows.append(row)
print(f"[DEBUG] Tabella {tab} - Numero righe lette: {len(rows)}")
if not rows:
log_event("empty_table", table=tab)
continue
ensure_table_structure(cur, table_name, first_row.keys(), first_row)
imported = 0
for row in rows:
try:
insert_row(cur, table_name, first_row.keys(), row)
imported += 1
except Exception as row_e:
log_event("row_error", table=table_name, data=row, error=str(row_e))
log_event("imported_rows", table=table_name, rows=imported)
print(f"Importate {imported} righe in {table_name}")
except Exception as e:
log_event("table_error", table=table_name, error=str(e))
print(f"Errore su tabella {tab}: {e}")
# --- MAIN ---
def main():
conn = connect_mysql()
cur = conn.cursor()
for root, _, files in os.walk(INPUT_ROOT):
for fname in files:
if fname.lower().endswith(".mdb"):
mdb_path = os.path.join(root, fname)
admin_code, cond_code = parse_info_from_path(mdb_path)
process_mdb(mdb_path, admin_code, cond_code, cur)
cur.close()
conn.close()
log_event("import_complete")
if __name__ == "__main__":
main()