266 lines
10 KiB
Python
266 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
SERVIZIO IMPORTAZIONE CSV/MDB GESCON → MYSQL
|
|
========================================================================================
|
|
Progetto: NetGescon Importer Service
|
|
Versione: 1.1 (2025-06-04)
|
|
Autore: Pikappa2 & Copilot
|
|
Commento: Importazione ricorsiva archivi CSV/MDB in MySQL.
|
|
- Prima importazione: solo CSV (creazione e popola tabelle).
|
|
- Successive: aggiorna le stesse tabelle dai MDB (aggiunge colonne se necessario, aggiorna schema_netgescon.json).
|
|
- Logging avanzato, mapping e gestione colonne problematiche in stile storico script v18.
|
|
- Pronto per lanciare come servizio (daemon/systemd).
|
|
========================================================================================
|
|
"""
|
|
|
|
import os
|
|
import csv
|
|
import json
|
|
import pymysql
|
|
import datetime
|
|
import subprocess
|
|
|
|
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
SCHEMA_FILE = os.path.join(SCRIPT_DIR, "schema_netgescon.json")
|
|
CONFIG_PATH = os.path.join(SCRIPT_DIR, "../agent_config.json")
|
|
LOG_DIR = os.path.join(SCRIPT_DIR, "../log")
|
|
os.makedirs(LOG_DIR, exist_ok=True)
|
|
LOGFILE = os.path.join(LOG_DIR, "import_netgescon.log")
|
|
COLONNE_PROBLEMATICHE_LOG = os.path.join(LOG_DIR, "colonne_problematiche.json")
|
|
|
|
with open(CONFIG_PATH) as f:
|
|
config = json.load(f)
|
|
MYSQL_HOST = config.get("MySQLHost", "localhost")
|
|
MYSQL_DB = config.get("MySQLDatabase", "netgescon")
|
|
MYSQL_USER = config.get("MySQLUser", "root")
|
|
MYSQL_PW = config.get("MySQLPassword", "password")
|
|
DATA_ROOT = config.get("OutputDirectory", os.path.join(SCRIPT_DIR, "../estratti"))
|
|
|
|
def log_event(msg, **kwargs):
|
|
row = {"event": msg, "timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
|
|
row.update(kwargs)
|
|
print("[LOG]", row)
|
|
with open(LOGFILE, "a") as f:
|
|
f.write(json.dumps(row, ensure_ascii=False) + "\n")
|
|
|
|
def log_colonna_problematica(tablename, colname, reason):
|
|
rec = {"table": tablename, "column": colname, "reason": reason, "timestamp": datetime.datetime.now().isoformat()}
|
|
try:
|
|
if os.path.exists(COLONNE_PROBLEMATICHE_LOG):
|
|
with open(COLONNE_PROBLEMATICHE_LOG, "r") as f:
|
|
data = json.load(f)
|
|
else:
|
|
data = []
|
|
except Exception:
|
|
data = []
|
|
data.append(rec)
|
|
with open(COLONNE_PROBLEMATICHE_LOG, "w") as f:
|
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
|
|
def connect_db():
|
|
return pymysql.connect(
|
|
host=MYSQL_HOST,
|
|
user=MYSQL_USER,
|
|
password=MYSQL_PW,
|
|
database=MYSQL_DB,
|
|
charset="utf8mb4",
|
|
autocommit=True
|
|
)
|
|
|
|
def get_create_table_sql(table, schema):
|
|
fields = []
|
|
primary_keys = []
|
|
foreigns = []
|
|
for name, props in schema["fields"].items():
|
|
field_def = f"`{name}` {props['type']}"
|
|
if props.get("primary"):
|
|
field_def += " NOT NULL"
|
|
primary_keys.append(name)
|
|
fields.append(field_def)
|
|
if "foreign" in props:
|
|
f = props["foreign"]
|
|
foreigns.append(
|
|
f"FOREIGN KEY (`{name}`) REFERENCES `{f['table']}`(`{f['field']}`)"
|
|
)
|
|
sql = f"CREATE TABLE IF NOT EXISTS `{table}` (\n " + ',\n '.join(fields)
|
|
if primary_keys:
|
|
sql += f",\n PRIMARY KEY({', '.join([f'`{k}`' for k in primary_keys])})"
|
|
if foreigns:
|
|
sql += ",\n " + ',\n '.join(foreigns)
|
|
sql += "\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;"
|
|
return sql
|
|
|
|
def find_csv_file(base_dir, rel_path):
|
|
abs_path = os.path.join(base_dir, rel_path)
|
|
if os.path.exists(abs_path):
|
|
return abs_path
|
|
# fallback: cerca in tutte le sottocartelle
|
|
for root, dirs, files in os.walk(base_dir):
|
|
if os.path.basename(rel_path) in files:
|
|
return os.path.join(root, os.path.basename(rel_path))
|
|
return None
|
|
|
|
def import_csv(table, schema, db_conn, base_dir):
|
|
csv_file = find_csv_file(base_dir, schema["source_csv"])
|
|
if not csv_file:
|
|
log_event(f"CSV file for {table} not found at {schema['source_csv']}")
|
|
return
|
|
log_event("import_csv_start", table=table, file=csv_file)
|
|
with open(csv_file, "r", encoding="utf-8") as f:
|
|
reader = csv.DictReader(f)
|
|
cur = db_conn.cursor()
|
|
for row in reader:
|
|
cols = []
|
|
vals = []
|
|
for col in schema["fields"].keys():
|
|
cols.append(f"`{col}`")
|
|
vals.append(row.get(col, None))
|
|
placeholders = ", ".join(["%s"] * len(vals))
|
|
try:
|
|
query = f"INSERT IGNORE INTO `{table}` ({', '.join(cols)}) VALUES ({placeholders})"
|
|
cur.execute(query, vals)
|
|
except Exception as e:
|
|
log_colonna_problematica(table, col, f"Insert error: {str(e)}")
|
|
log_event("insert_error", table=table, row=row, error=str(e))
|
|
db_conn.commit()
|
|
log_event("imported_rows", table=table, rows=cur.rowcount)
|
|
|
|
def mdb_schema_extract(mdb_path):
|
|
"""
|
|
Ritorna: dict {tabella: [col1, col2, ...]}
|
|
"""
|
|
try:
|
|
tables = subprocess.check_output(["mdb-tables", "-1", mdb_path]).decode().split()
|
|
except Exception as e:
|
|
log_event("mdb_schema_error", file=mdb_path, error=str(e))
|
|
return {}
|
|
schema = {}
|
|
for tab in tables:
|
|
try:
|
|
header_proc = subprocess.check_output(
|
|
["mdb-export", "-H", mdb_path, tab]
|
|
)
|
|
header = [h.strip() for h in header_proc.decode(errors="ignore").strip().split(",")]
|
|
schema[tab] = header
|
|
except Exception as e:
|
|
log_event("mdb_table_schema_error", file=mdb_path, table=tab, error=str(e))
|
|
return schema
|
|
|
|
def update_schema_json_from_mdb(mdb_path, schema_json):
|
|
"""
|
|
Aggiorna schema_json (dict) aggiungendo nuove tabelle/colonne viste nel MDB
|
|
"""
|
|
mdb_schema = mdb_schema_extract(mdb_path)
|
|
updated = False
|
|
for tabella, columns in mdb_schema.items():
|
|
tabella_l = tabella.lower()
|
|
# Mappatura automatica: se la tabella non esiste la aggiunge
|
|
if tabella_l not in schema_json:
|
|
schema_json[tabella_l] = {
|
|
"source_mdb": mdb_path,
|
|
"fields": {col: {"type": "VARCHAR(255)"} for col in columns}
|
|
}
|
|
log_event("schema_added_table", table=tabella_l, from_mdb=True)
|
|
updated = True
|
|
else:
|
|
# Se esiste, aggiunge solo le nuove colonne
|
|
existing_fields = schema_json[tabella_l]["fields"]
|
|
for col in columns:
|
|
if col not in existing_fields:
|
|
existing_fields[col] = {"type": "VARCHAR(255)"}
|
|
log_event("schema_added_column", table=tabella_l, column=col)
|
|
updated = True
|
|
return updated
|
|
|
|
def import_mdb_tables(mdb_path, schema_json, db_conn):
|
|
"""
|
|
Importa/aggiorna le tabelle viste nel mdb secondo lo schema esteso
|
|
"""
|
|
mdb_schema = mdb_schema_extract(mdb_path)
|
|
for tabella, columns in mdb_schema.items():
|
|
tabella_l = tabella.lower()
|
|
if tabella_l not in schema_json:
|
|
log_event("skip_import", table=tabella_l, reason="Not in schema after update")
|
|
continue
|
|
# Crea tabella se non esiste/aggiorna se servono nuove colonne
|
|
sql = get_create_table_sql(tabella_l, schema_json[tabella_l])
|
|
with db_conn.cursor() as cur:
|
|
cur.execute(sql)
|
|
# Importa dati
|
|
try:
|
|
# 1. HEADER + 2. DATI via mdb-export
|
|
proc = subprocess.Popen(
|
|
["mdb-export", "--no-header", "-d", ",", mdb_path, tabella],
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
|
)
|
|
out, err = proc.communicate()
|
|
if proc.returncode != 0:
|
|
log_event("mdb_export_error", table=tabella, error=err.decode())
|
|
continue
|
|
out = out.decode(errors="ignore")
|
|
lines = out.splitlines()
|
|
if not lines or not lines[0].strip():
|
|
log_event("empty_table", table=tabella)
|
|
continue
|
|
reader = csv.DictReader(lines, fieldnames=columns)
|
|
cur = db_conn.cursor()
|
|
for row in reader:
|
|
cols = []
|
|
vals = []
|
|
for col in columns:
|
|
cols.append(f"`{col}`")
|
|
vals.append(row.get(col, None))
|
|
placeholders = ", ".join(["%s"] * len(vals))
|
|
try:
|
|
query = f"INSERT IGNORE INTO `{tabella_l}` ({', '.join(cols)}) VALUES ({placeholders})"
|
|
cur.execute(query, vals)
|
|
except Exception as e:
|
|
log_colonna_problematica(tabella_l, col, f"Insert error: {str(e)}")
|
|
log_event("insert_error", table=tabella_l, row=row, error=str(e))
|
|
db_conn.commit()
|
|
log_event("imported_rows", table=tabella_l, rows=cur.rowcount)
|
|
except Exception as e:
|
|
log_event("mdb_import_error", table=tabella_l, error=str(e))
|
|
|
|
def main():
|
|
# 1. Carica schema
|
|
if os.path.exists(SCHEMA_FILE):
|
|
with open(SCHEMA_FILE, "r") as f:
|
|
schema = json.load(f)
|
|
else:
|
|
schema = {}
|
|
|
|
db_conn = connect_db()
|
|
|
|
# 2. CREAZIONE TABELLE E IMPORT CSV (solo se tabella già nello schema)
|
|
for table, table_schema in schema.items():
|
|
sql = get_create_table_sql(table, table_schema)
|
|
with db_conn.cursor() as cur:
|
|
cur.execute(sql)
|
|
log_event("table_ready", table=table)
|
|
import_csv(table, table_schema, db_conn, DATA_ROOT)
|
|
|
|
# 3. SCAN E AGGIORNA/IMPORTA TABELLE DA MDB (incrementale, aggiorna schema_json)
|
|
mdb_files = []
|
|
for root, _, files in os.walk(DATA_ROOT):
|
|
for fname in files:
|
|
if fname.lower().endswith(".mdb"):
|
|
mdb_files.append(os.path.join(root, fname))
|
|
schema_updated = False
|
|
for mdb_path in mdb_files:
|
|
log_event("scan_mdb", file=mdb_path)
|
|
updated = update_schema_json_from_mdb(mdb_path, schema)
|
|
if updated:
|
|
schema_updated = True
|
|
import_mdb_tables(mdb_path, schema, db_conn)
|
|
# 4. Salva schema aggiornato se cambiato
|
|
if schema_updated:
|
|
with open(SCHEMA_FILE, "w") as f:
|
|
json.dump(schema, f, indent=2, ensure_ascii=False)
|
|
log_event("schema_updated", file=SCHEMA_FILE)
|
|
|
|
db_conn.close()
|
|
log_event("import_complete")
|
|
|
|
if __name__ == "__main__":
|
|
main() |