200 lines
6.8 KiB
Python
200 lines
6.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
import_mdb_to_mysql_advanced.py
|
|
|
|
Script avanzato per importazione dati da file MDB a MySQL.
|
|
- Ricerca ricorsiva file MDB (come lo script bash).
|
|
- Nessun file temporaneo: tutto in memoria.
|
|
- Per ogni tabella:
|
|
- Estrae i dati direttamente via mdb-export in streaming.
|
|
- Calcola hash MD5 di ogni riga (come facevi con i CSV).
|
|
- Deduplica: importa solo nuove righe in MySQL, evita doppioni.
|
|
- Aggiorna struttura tabella MySQL se cambia modello.
|
|
- Logging avanzato (file JSON).
|
|
- Pronto per essere usato/esteso in futuro, anche per route Laravel/backend API.
|
|
|
|
Autore: Pikappa2 & Copilot
|
|
Ultimo agg.: 2025-06-02
|
|
"""
|
|
|
|
import os
|
|
import pymysql
|
|
import subprocess
|
|
import csv
|
|
import json
|
|
import hashlib
|
|
import datetime
|
|
import re
|
|
|
|
# --- CONFIGURAZIONE ---
|
|
|
|
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
CONFIG_PATH = os.path.join(SCRIPT_DIR, "../agent_config.json")
|
|
LOGDIR = os.path.join(SCRIPT_DIR, "../log")
|
|
os.makedirs(LOGDIR, exist_ok=True)
|
|
LOGFILE = os.path.join(LOGDIR, "import_mdb_to_mysql.jsonlog")
|
|
|
|
# Config da JSON esterno
|
|
with open(CONFIG_PATH) as f:
|
|
config = json.load(f)
|
|
MYSQL_HOST = config.get("MySQLHost", "localhost")
|
|
MYSQL_DB = config.get("MySQLDatabase", "netgescon")
|
|
MYSQL_USER = config.get("MySQLUser", "root")
|
|
MYSQL_PW = config.get("MySQLPassword", "password")
|
|
INPUT_ROOT = config.get("InputDirectory", SCRIPT_DIR)
|
|
|
|
# --- LOGGING ---
|
|
|
|
def log_event(event, **kwargs):
|
|
row = {"event": event, "timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
|
|
row.update(kwargs)
|
|
with open(LOGFILE, "a") as f:
|
|
f.write(json.dumps(row, ensure_ascii=False) + "\n")
|
|
|
|
# --- MYSQL UTILS ---
|
|
|
|
def connect_mysql():
|
|
return pymysql.connect(
|
|
host=MYSQL_HOST,
|
|
user=MYSQL_USER,
|
|
password=MYSQL_PW,
|
|
database=MYSQL_DB,
|
|
charset="utf8mb4",
|
|
autocommit=True
|
|
)
|
|
|
|
def infer_type(val):
|
|
if val is None or str(val).strip() == "":
|
|
return "VARCHAR(255)"
|
|
s = str(val)
|
|
if re.fullmatch(r"-?\d+", s):
|
|
return "INT"
|
|
try:
|
|
float(s.replace(",", "."))
|
|
return "FLOAT"
|
|
except Exception:
|
|
pass
|
|
if len(s) > 255:
|
|
return "TEXT"
|
|
return "VARCHAR(255)"
|
|
|
|
def get_existing_columns(cur, table_name):
|
|
try:
|
|
cur.execute(f"SHOW COLUMNS FROM `{table_name}`;")
|
|
return set([row[0] for row in cur.fetchall()])
|
|
except Exception:
|
|
return set()
|
|
|
|
def alter_table_add_columns(cur, table_name, header, sample_row):
|
|
existing_cols = get_existing_columns(cur, table_name)
|
|
for col in header:
|
|
if col not in existing_cols:
|
|
t = infer_type(sample_row.get(col, ""))
|
|
sql = f"ALTER TABLE `{table_name}` ADD COLUMN `{col}` {t} NULL"
|
|
cur.execute(sql)
|
|
|
|
def create_table(cur, table_name, header, sample_row):
|
|
fields_types = []
|
|
for col in header:
|
|
sample = sample_row.get(col, "")
|
|
t = infer_type(sample)
|
|
fields_types.append(f"`{col}` {t}")
|
|
sql = f"CREATE TABLE IF NOT EXISTS `{table_name}` (" \
|
|
f"id INT AUTO_INCREMENT PRIMARY KEY, " \
|
|
f"{', '.join(fields_types)}, " \
|
|
f"_hash_row CHAR(32) UNIQUE, " \
|
|
f"_imported_at DATETIME)"
|
|
cur.execute(sql)
|
|
|
|
def ensure_table_structure(cur, table_name, header, sample_row):
|
|
cur.execute("SHOW TABLES LIKE %s", (table_name,))
|
|
if cur.fetchone():
|
|
alter_table_add_columns(cur, table_name, header, sample_row)
|
|
else:
|
|
create_table(cur, table_name, header, sample_row)
|
|
|
|
def calc_row_hash_md5(header, row):
|
|
values = [str(row.get(col, "")).strip() for col in header]
|
|
return hashlib.md5("|".join(values).encode("utf-8")).hexdigest()
|
|
|
|
def insert_row(cur, table_name, header, row):
|
|
hash_row = calc_row_hash_md5(header, row)
|
|
cols = [f"`{col}`" for col in header]
|
|
vals = [row.get(col, None) for col in header]
|
|
cols += ["_hash_row", "_imported_at"]
|
|
vals += [hash_row, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")]
|
|
placeholders = ", ".join(["%s"] * len(cols))
|
|
sql = f"INSERT IGNORE INTO `{table_name}` ({', '.join(cols)}) VALUES ({placeholders})"
|
|
cur.execute(sql, vals)
|
|
|
|
def safe_name(s):
|
|
return re.sub(r"[^a-zA-Z0-9_]", "_", str(s)).lower()
|
|
|
|
def parse_info_from_path(path):
|
|
# Personalizza se vuoi info da path
|
|
return "unknownadmin", "unknowncond"
|
|
|
|
def stream_csv_from_mdb(mdb_path, tab):
|
|
proc = subprocess.Popen(["mdb-export", "-H", "csv", mdb_path, tab], stdout=subprocess.PIPE)
|
|
for line in proc.stdout:
|
|
yield line.decode(errors="ignore")
|
|
|
|
# --- ESTRAZIONE E IMPORT IN MEMORIA ---
|
|
|
|
def process_mdb(mdb_path, admin_code, cond_code, cur):
|
|
base = os.path.splitext(os.path.basename(mdb_path))[0]
|
|
log_event("start_mdb", file=mdb_path)
|
|
try:
|
|
tables = subprocess.check_output(["mdb-tables", "-1", mdb_path]).decode().split()
|
|
except Exception as e:
|
|
log_event("error_mdb", file=mdb_path, error=str(e))
|
|
print(f"Errore lettura tabelle: {e}")
|
|
return
|
|
for tab in tables:
|
|
safe_tab = safe_name(tab)
|
|
table_name = f"mdb_{safe_name(admin_code)}_{safe_name(cond_code)}_{safe_name(base)}_{safe_tab}"
|
|
log_event("import_table", mdb=mdb_path, admin=admin_code, condominio=cond_code, table_mdb=tab, table_mysql=table_name)
|
|
try:
|
|
csv_stream = stream_csv_from_mdb(mdb_path, tab)
|
|
reader = csv.DictReader(csv_stream, delimiter=",")
|
|
first_row = None
|
|
rows = []
|
|
for row in reader:
|
|
if not first_row:
|
|
first_row = row
|
|
rows.append(row)
|
|
print(f"[DEBUG] Tabella {tab} - Numero righe lette: {len(rows)}")
|
|
if not rows:
|
|
log_event("empty_table", table=tab)
|
|
continue
|
|
ensure_table_structure(cur, table_name, first_row.keys(), first_row)
|
|
imported = 0
|
|
for row in rows:
|
|
try:
|
|
insert_row(cur, table_name, first_row.keys(), row)
|
|
imported += 1
|
|
except Exception as row_e:
|
|
log_event("row_error", table=table_name, data=row, error=str(row_e))
|
|
log_event("imported_rows", table=table_name, rows=imported)
|
|
print(f"Importate {imported} righe in {table_name}")
|
|
except Exception as e:
|
|
log_event("table_error", table=table_name, error=str(e))
|
|
print(f"Errore su tabella {tab}: {e}")
|
|
|
|
# --- MAIN ---
|
|
|
|
def main():
|
|
conn = connect_mysql()
|
|
cur = conn.cursor()
|
|
for root, _, files in os.walk(INPUT_ROOT):
|
|
for fname in files:
|
|
if fname.lower().endswith(".mdb"):
|
|
mdb_path = os.path.join(root, fname)
|
|
admin_code, cond_code = parse_info_from_path(mdb_path)
|
|
process_mdb(mdb_path, admin_code, cond_code, cur)
|
|
cur.close()
|
|
conn.close()
|
|
log_event("import_complete")
|
|
|
|
if __name__ == "__main__":
|
|
main() |