netgescon-master/scripts/Script fatti per prova e per ora sospesi/import_netgescon_csv_mysql.py

266 lines
10 KiB
Python

#!/usr/bin/env python3
"""
SERVIZIO IMPORTAZIONE CSV/MDB GESCON → MYSQL
========================================================================================
Progetto: NetGescon Importer Service
Versione: 1.1 (2025-06-04)
Autore: Pikappa2 & Copilot
Commento: Importazione ricorsiva archivi CSV/MDB in MySQL.
- Prima importazione: solo CSV (creazione e popola tabelle).
- Successive: aggiorna le stesse tabelle dai MDB (aggiunge colonne se necessario, aggiorna schema_netgescon.json).
- Logging avanzato, mapping e gestione colonne problematiche in stile storico script v18.
- Pronto per lanciare come servizio (daemon/systemd).
========================================================================================
"""
import os
import csv
import json
import pymysql
import datetime
import subprocess
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
SCHEMA_FILE = os.path.join(SCRIPT_DIR, "schema_netgescon.json")
CONFIG_PATH = os.path.join(SCRIPT_DIR, "../agent_config.json")
LOG_DIR = os.path.join(SCRIPT_DIR, "../log")
os.makedirs(LOG_DIR, exist_ok=True)
LOGFILE = os.path.join(LOG_DIR, "import_netgescon.log")
COLONNE_PROBLEMATICHE_LOG = os.path.join(LOG_DIR, "colonne_problematiche.json")
with open(CONFIG_PATH) as f:
config = json.load(f)
MYSQL_HOST = config.get("MySQLHost", "localhost")
MYSQL_DB = config.get("MySQLDatabase", "netgescon")
MYSQL_USER = config.get("MySQLUser", "root")
MYSQL_PW = config.get("MySQLPassword", "password")
DATA_ROOT = config.get("OutputDirectory", os.path.join(SCRIPT_DIR, "../estratti"))
def log_event(msg, **kwargs):
row = {"event": msg, "timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
row.update(kwargs)
print("[LOG]", row)
with open(LOGFILE, "a") as f:
f.write(json.dumps(row, ensure_ascii=False) + "\n")
def log_colonna_problematica(tablename, colname, reason):
rec = {"table": tablename, "column": colname, "reason": reason, "timestamp": datetime.datetime.now().isoformat()}
try:
if os.path.exists(COLONNE_PROBLEMATICHE_LOG):
with open(COLONNE_PROBLEMATICHE_LOG, "r") as f:
data = json.load(f)
else:
data = []
except Exception:
data = []
data.append(rec)
with open(COLONNE_PROBLEMATICHE_LOG, "w") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def connect_db():
return pymysql.connect(
host=MYSQL_HOST,
user=MYSQL_USER,
password=MYSQL_PW,
database=MYSQL_DB,
charset="utf8mb4",
autocommit=True
)
def get_create_table_sql(table, schema):
fields = []
primary_keys = []
foreigns = []
for name, props in schema["fields"].items():
field_def = f"`{name}` {props['type']}"
if props.get("primary"):
field_def += " NOT NULL"
primary_keys.append(name)
fields.append(field_def)
if "foreign" in props:
f = props["foreign"]
foreigns.append(
f"FOREIGN KEY (`{name}`) REFERENCES `{f['table']}`(`{f['field']}`)"
)
sql = f"CREATE TABLE IF NOT EXISTS `{table}` (\n " + ',\n '.join(fields)
if primary_keys:
sql += f",\n PRIMARY KEY({', '.join([f'`{k}`' for k in primary_keys])})"
if foreigns:
sql += ",\n " + ',\n '.join(foreigns)
sql += "\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;"
return sql
def find_csv_file(base_dir, rel_path):
abs_path = os.path.join(base_dir, rel_path)
if os.path.exists(abs_path):
return abs_path
# fallback: cerca in tutte le sottocartelle
for root, dirs, files in os.walk(base_dir):
if os.path.basename(rel_path) in files:
return os.path.join(root, os.path.basename(rel_path))
return None
def import_csv(table, schema, db_conn, base_dir):
csv_file = find_csv_file(base_dir, schema["source_csv"])
if not csv_file:
log_event(f"CSV file for {table} not found at {schema['source_csv']}")
return
log_event("import_csv_start", table=table, file=csv_file)
with open(csv_file, "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
cur = db_conn.cursor()
for row in reader:
cols = []
vals = []
for col in schema["fields"].keys():
cols.append(f"`{col}`")
vals.append(row.get(col, None))
placeholders = ", ".join(["%s"] * len(vals))
try:
query = f"INSERT IGNORE INTO `{table}` ({', '.join(cols)}) VALUES ({placeholders})"
cur.execute(query, vals)
except Exception as e:
log_colonna_problematica(table, col, f"Insert error: {str(e)}")
log_event("insert_error", table=table, row=row, error=str(e))
db_conn.commit()
log_event("imported_rows", table=table, rows=cur.rowcount)
def mdb_schema_extract(mdb_path):
"""
Ritorna: dict {tabella: [col1, col2, ...]}
"""
try:
tables = subprocess.check_output(["mdb-tables", "-1", mdb_path]).decode().split()
except Exception as e:
log_event("mdb_schema_error", file=mdb_path, error=str(e))
return {}
schema = {}
for tab in tables:
try:
header_proc = subprocess.check_output(
["mdb-export", "-H", mdb_path, tab]
)
header = [h.strip() for h in header_proc.decode(errors="ignore").strip().split(",")]
schema[tab] = header
except Exception as e:
log_event("mdb_table_schema_error", file=mdb_path, table=tab, error=str(e))
return schema
def update_schema_json_from_mdb(mdb_path, schema_json):
"""
Aggiorna schema_json (dict) aggiungendo nuove tabelle/colonne viste nel MDB
"""
mdb_schema = mdb_schema_extract(mdb_path)
updated = False
for tabella, columns in mdb_schema.items():
tabella_l = tabella.lower()
# Mappatura automatica: se la tabella non esiste la aggiunge
if tabella_l not in schema_json:
schema_json[tabella_l] = {
"source_mdb": mdb_path,
"fields": {col: {"type": "VARCHAR(255)"} for col in columns}
}
log_event("schema_added_table", table=tabella_l, from_mdb=True)
updated = True
else:
# Se esiste, aggiunge solo le nuove colonne
existing_fields = schema_json[tabella_l]["fields"]
for col in columns:
if col not in existing_fields:
existing_fields[col] = {"type": "VARCHAR(255)"}
log_event("schema_added_column", table=tabella_l, column=col)
updated = True
return updated
def import_mdb_tables(mdb_path, schema_json, db_conn):
"""
Importa/aggiorna le tabelle viste nel mdb secondo lo schema esteso
"""
mdb_schema = mdb_schema_extract(mdb_path)
for tabella, columns in mdb_schema.items():
tabella_l = tabella.lower()
if tabella_l not in schema_json:
log_event("skip_import", table=tabella_l, reason="Not in schema after update")
continue
# Crea tabella se non esiste/aggiorna se servono nuove colonne
sql = get_create_table_sql(tabella_l, schema_json[tabella_l])
with db_conn.cursor() as cur:
cur.execute(sql)
# Importa dati
try:
# 1. HEADER + 2. DATI via mdb-export
proc = subprocess.Popen(
["mdb-export", "--no-header", "-d", ",", mdb_path, tabella],
stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
out, err = proc.communicate()
if proc.returncode != 0:
log_event("mdb_export_error", table=tabella, error=err.decode())
continue
out = out.decode(errors="ignore")
lines = out.splitlines()
if not lines or not lines[0].strip():
log_event("empty_table", table=tabella)
continue
reader = csv.DictReader(lines, fieldnames=columns)
cur = db_conn.cursor()
for row in reader:
cols = []
vals = []
for col in columns:
cols.append(f"`{col}`")
vals.append(row.get(col, None))
placeholders = ", ".join(["%s"] * len(vals))
try:
query = f"INSERT IGNORE INTO `{tabella_l}` ({', '.join(cols)}) VALUES ({placeholders})"
cur.execute(query, vals)
except Exception as e:
log_colonna_problematica(tabella_l, col, f"Insert error: {str(e)}")
log_event("insert_error", table=tabella_l, row=row, error=str(e))
db_conn.commit()
log_event("imported_rows", table=tabella_l, rows=cur.rowcount)
except Exception as e:
log_event("mdb_import_error", table=tabella_l, error=str(e))
def main():
# 1. Carica schema
if os.path.exists(SCHEMA_FILE):
with open(SCHEMA_FILE, "r") as f:
schema = json.load(f)
else:
schema = {}
db_conn = connect_db()
# 2. CREAZIONE TABELLE E IMPORT CSV (solo se tabella già nello schema)
for table, table_schema in schema.items():
sql = get_create_table_sql(table, table_schema)
with db_conn.cursor() as cur:
cur.execute(sql)
log_event("table_ready", table=table)
import_csv(table, table_schema, db_conn, DATA_ROOT)
# 3. SCAN E AGGIORNA/IMPORTA TABELLE DA MDB (incrementale, aggiorna schema_json)
mdb_files = []
for root, _, files in os.walk(DATA_ROOT):
for fname in files:
if fname.lower().endswith(".mdb"):
mdb_files.append(os.path.join(root, fname))
schema_updated = False
for mdb_path in mdb_files:
log_event("scan_mdb", file=mdb_path)
updated = update_schema_json_from_mdb(mdb_path, schema)
if updated:
schema_updated = True
import_mdb_tables(mdb_path, schema, db_conn)
# 4. Salva schema aggiornato se cambiato
if schema_updated:
with open(SCHEMA_FILE, "w") as f:
json.dump(schema, f, indent=2, ensure_ascii=False)
log_event("schema_updated", file=SCHEMA_FILE)
db_conn.close()
log_event("import_complete")
if __name__ == "__main__":
main()