netgescon-master/scripts/import_mdb_to_mysql.py

145 lines
5.0 KiB
Python

import os
import subprocess
import csv
import pymysql
import re
import json
from datetime import datetime
# ================== CONFIGURAZIONE ==================
CONFIG_PATH = os.path.expanduser("~/netgescon/agent_config.json")
with open(CONFIG_PATH) as f:
config = json.load(f)
MDB_ROOT = config["InputDirectory"]
MYSQL_HOST = config.get("MySQLHost", "localhost")
MYSQL_DB = config.get("MySQLDatabase", "netgescon")
MYSQL_USER = config.get("MySQLUser", "user")
MYSQL_PW = config.get("MySQLPassword", "password")
DEBUG = config.get("Debug", False)
LOG_DIR = config.get("LogDirectory", os.path.expanduser("~/netgescon/log"))
LOG_PATH = os.path.join(LOG_DIR, "import_mdb_to_mysql.log")
os.makedirs(LOG_DIR, exist_ok=True)
# ================== LOGGING ==================
def log(msg):
"""Scrive un messaggio nel file di log e, se attivo, anche a video."""
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(LOG_PATH, "a", encoding="utf-8") as f:
f.write(f"[{ts}] {msg}\n")
if DEBUG:
print(f"[{ts}] {msg}")
# ================== MYSQL UTILS ==================
def connect_mysql():
"""Restituisce una connessione MySQL."""
return pymysql.connect(
host=MYSQL_HOST,
user=MYSQL_USER,
password=MYSQL_PW,
database=MYSQL_DB,
charset="utf8mb4",
autocommit=True
)
def table_exists(cur, name):
"""Verifica se una tabella esiste."""
cur.execute("SHOW TABLES LIKE %s", (name,))
return cur.fetchone() is not None
def create_table(cur, tablename, header, samples):
"""Crea la tabella su MySQL con tipi campo proposti."""
field_types = []
log(f"Creazione tabella `{tablename}`")
for h, s in zip(header, samples):
t = infer_type(s)
field_types.append((h, t))
columns_sql = ", ".join(f"`{h}` {t}" for h, t in field_types)
sql = f"CREATE TABLE IF NOT EXISTS `{tablename}` (id INT AUTO_INCREMENT PRIMARY KEY, {columns_sql}) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4"
cur.execute(sql)
log(f"Tabella `{tablename}` creata con colonne: {columns_sql}")
return [h for h, _ in field_types]
def insert_row(cur, tablename, fields, row):
"""Inserisce una riga nella tabella MySQL, gestendo errori di tipo."""
values = [row.get(f, None) for f in fields]
placeholders = ", ".join(["%s"] * len(fields))
sql = f"INSERT INTO `{tablename}` ({', '.join('`'+f+'`' for f in fields)}) VALUES ({placeholders})"
try:
cur.execute(sql, values)
except Exception as e:
log(f"[ERRORE] Insert in {tablename}: {e} | Valori: {values}")
# ================== TIPO CAMPO ==================
def infer_type(val):
"""Propone un tipo campo MySQL in base al valore di esempio."""
if val is None or val == "":
return "VARCHAR(255)"
if re.fullmatch(r"\d+", val):
return "INT"
if re.fullmatch(r"\d+\.\d+", val):
return "FLOAT"
if re.fullmatch(r"\d{4}-\d{2}-\d{2}", val):
return "DATE"
if len(val) > 255:
return "TEXT"
return "VARCHAR(255)"
# ================== MDBTOOLS UTILS ==================
def process_mdb(mdb_path, cur):
"""Importa tutte le tabelle da un file MDB."""
log(f"Inizio importazione {mdb_path}")
try:
# 1. Ottieni la lista tabelle
tables = subprocess.check_output(["mdb-tables", "-1", mdb_path]).decode().split()
except Exception as e:
log(f"[ERRORE] Lettura tabelle MDB: {e}")
return
for tab in tables:
log(f"Import tabella: {tab}")
# 2. Estrai header e 1 record di esempio
try:
proc = subprocess.Popen(["mdb-export", "-H", "csv", mdb_path, tab], stdout=subprocess.PIPE)
reader = csv.DictReader((line.decode() for line in proc.stdout), delimiter=",")
sample = next(reader)
except StopIteration:
log(f"Tabella {tab} vuota, saltata.")
continue
except Exception as e:
log(f"[ERRORE] Lettura tabella {tab}: {e}")
continue
header = list(sample.keys())
samples = [sample[h] for h in header]
# 3. Crea tabella se non esiste
if not table_exists(cur, tab):
fields = create_table(cur, tab, header, samples)
else:
fields = header
log(f"Tabella `{tab}` già esistente.")
# 4. Inserisci dati (sample + altri)
insert_row(cur, tab, fields, sample)
row_count = 1
for row in reader:
insert_row(cur, tab, fields, row)
row_count += 1
log(f"Tabella {tab}: {row_count} righe importate.")
def main():
log("==== AVVIO IMPORT MDB → MYSQL ====")
conn = connect_mysql()
cur = conn.cursor()
file_count = 0
for root, _, files in os.walk(MDB_ROOT):
for fname in files:
if fname.lower().endswith(".mdb"):
file_count += 1
process_mdb(os.path.join(root, fname), cur)
cur.close()
conn.close()
log(f"Importazione completata. {file_count} file MDB processati.")
if __name__ == "__main__":
main()